Skip to content

Commit 9cc7fc2

Browse files
DATAMONGO-1855 - Polishing
Introduce base class to share code between imperative and reactive GridFs. Original Pull Request: #637
1 parent f40861b commit 9cc7fc2

File tree

7 files changed

+412
-247
lines changed

7 files changed

+412
-247
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java

Lines changed: 72 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}.
5050
*
5151
* @author Mark Paluch
52+
* @author Christoph Strobl
5253
* @since 2.2
5354
*/
5455
@RequiredArgsConstructor
@@ -75,10 +76,10 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
7576
.get();
7677

7778
// see DEMAND
78-
private volatile long demand;
79+
volatile long demand;
7980

8081
// see SUBSCRIBED
81-
private volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED;
82+
volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED;
8283

8384
/*
8485
* (non-Javadoc)
@@ -94,20 +95,23 @@ public Publisher<Integer> read(ByteBuffer dst) {
9495
try {
9596

9697
if (error != null) {
98+
9799
sink.error(error);
98100
return;
99101
}
100102

101103
if (bytecount == -1) {
104+
102105
sink.success(-1);
103106
return;
104107
}
105108

106109
ByteBuffer byteBuffer = db.asByteBuffer();
107110
int toWrite = byteBuffer.remaining();
108-
dst.put(byteBuffer);
109111

112+
dst.put(byteBuffer);
110113
sink.success(toWrite);
114+
111115
} catch (Exception e) {
112116
sink.error(e);
113117
} finally {
@@ -127,6 +131,7 @@ public Publisher<Integer> read(ByteBuffer dst) {
127131
public Publisher<Success> close() {
128132

129133
return Mono.create(sink -> {
134+
130135
cancelled = true;
131136

132137
if (error != null) {
@@ -141,6 +146,7 @@ public Publisher<Success> close() {
141146
protected void request(int n) {
142147

143148
if (complete) {
149+
144150
terminatePendingReads();
145151
return;
146152
}
@@ -150,67 +156,9 @@ protected void request(int n) {
150156
if (SUBSCRIBED.get(this) == SUBSCRIPTION_NOT_SUBSCRIBED) {
151157

152158
if (SUBSCRIBED.compareAndSet(this, SUBSCRIPTION_NOT_SUBSCRIBED, SUBSCRIPTION_SUBSCRIBED)) {
153-
154-
buffers.subscribe(new CoreSubscriber<DataBuffer>() {
155-
156-
@Override
157-
public Context currentContext() {
158-
return subscriberContext;
159-
}
160-
161-
@Override
162-
public void onSubscribe(Subscription s) {
163-
subscription = s;
164-
165-
Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1);
166-
s.request(1);
167-
}
168-
169-
@Override
170-
public void onNext(DataBuffer dataBuffer) {
171-
172-
if (cancelled || complete) {
173-
DataBufferUtils.release(dataBuffer);
174-
Operators.onNextDropped(dataBuffer, subscriberContext);
175-
return;
176-
}
177-
178-
BiConsumer<DataBuffer, Integer> poll = readRequests.poll();
179-
180-
if (poll == null) {
181-
182-
DataBufferUtils.release(dataBuffer);
183-
Operators.onNextDropped(dataBuffer, subscriberContext);
184-
subscription.cancel();
185-
return;
186-
}
187-
188-
poll.accept(dataBuffer, dataBuffer.readableByteCount());
189-
190-
requestFromSubscription(subscription);
191-
}
192-
193-
@Override
194-
public void onError(Throwable t) {
195-
196-
if (cancelled || complete) {
197-
Operators.onErrorDropped(t, subscriberContext);
198-
return;
199-
}
200-
201-
error = t;
202-
complete = true;
203-
terminatePendingReads();
204-
}
205-
206-
@Override
207-
public void onComplete() {
208-
209-
complete = true;
210-
terminatePendingReads();
211-
}
212-
});
159+
buffers.subscribe(new DataBufferCoreSubscriber());
213160
}
161+
214162
} else {
215163

216164
Subscription subscription = this.subscription;
@@ -245,4 +193,65 @@ void terminatePendingReads() {
245193
readers.accept(factory.wrap(new byte[0]), -1);
246194
}
247195
}
196+
197+
private class DataBufferCoreSubscriber implements CoreSubscriber<DataBuffer> {
198+
199+
@Override
200+
public Context currentContext() {
201+
return AsyncInputStreamAdapter.this.subscriberContext;
202+
}
203+
204+
@Override
205+
public void onSubscribe(Subscription s) {
206+
207+
AsyncInputStreamAdapter.this.subscription = s;
208+
209+
Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1);
210+
s.request(1);
211+
}
212+
213+
@Override
214+
public void onNext(DataBuffer dataBuffer) {
215+
216+
if (cancelled || complete) {
217+
DataBufferUtils.release(dataBuffer);
218+
Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
219+
return;
220+
}
221+
222+
BiConsumer<DataBuffer, Integer> poll = AsyncInputStreamAdapter.this.readRequests.poll();
223+
224+
if (poll == null) {
225+
226+
DataBufferUtils.release(dataBuffer);
227+
Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
228+
subscription.cancel();
229+
return;
230+
}
231+
232+
poll.accept(dataBuffer, dataBuffer.readableByteCount());
233+
234+
requestFromSubscription(subscription);
235+
}
236+
237+
@Override
238+
public void onError(Throwable t) {
239+
240+
if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.complete) {
241+
Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext);
242+
return;
243+
}
244+
245+
AsyncInputStreamAdapter.this.error = t;
246+
AsyncInputStreamAdapter.this.complete = true;
247+
terminatePendingReads();
248+
}
249+
250+
@Override
251+
public void onComplete() {
252+
253+
AsyncInputStreamAdapter.this.complete = true;
254+
terminatePendingReads();
255+
}
256+
}
248257
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java

Lines changed: 64 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
* Utility to adapt a {@link AsyncInputStream} to a {@link Publisher} emitting {@link DataBuffer}.
4040
*
4141
* @author Mark Paluch
42+
* @author Christoph Strobl
4243
* @since 2.2
4344
*/
4445
class DataBufferPublisherAdapter {
@@ -51,7 +52,7 @@ class DataBufferPublisherAdapter {
5152
* @param dataBufferFactory must not be {@literal null}.
5253
* @return the resulting {@link Publisher}.
5354
*/
54-
public static Flux<DataBuffer> createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
55+
static Flux<DataBuffer> createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
5556

5657
State state = new State(inputStream, dataBufferFactory);
5758

@@ -73,17 +74,17 @@ public static Flux<DataBuffer> createBinaryStream(AsyncInputStream inputStream,
7374
@RequiredArgsConstructor
7475
static class State {
7576

76-
static final AtomicLongFieldUpdater<State> DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, "demand");
77+
private static final AtomicLongFieldUpdater<State> DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, "demand");
7778

78-
static final AtomicIntegerFieldUpdater<State> STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, "state");
79+
private static final AtomicIntegerFieldUpdater<State> STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, "state");
7980

80-
static final AtomicIntegerFieldUpdater<State> READ = AtomicIntegerFieldUpdater.newUpdater(State.class, "read");
81+
private static final AtomicIntegerFieldUpdater<State> READ = AtomicIntegerFieldUpdater.newUpdater(State.class, "read");
8182

82-
static final int STATE_OPEN = 0;
83-
static final int STATE_CLOSED = 1;
83+
private static final int STATE_OPEN = 0;
84+
private static final int STATE_CLOSED = 1;
8485

85-
static final int READ_NONE = 0;
86-
static final int READ_IN_PROGRESS = 1;
86+
private static final int READ_NONE = 0;
87+
private static final int READ_IN_PROGRESS = 1;
8788

8889
final AsyncInputStream inputStream;
8990
final DataBufferFactory dataBufferFactory;
@@ -140,66 +141,79 @@ void emitNext(FluxSink<DataBuffer> sink) {
140141
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer();
141142
ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity());
142143

143-
Mono.from(inputStream.read(intermediate)).subscribe(new CoreSubscriber<Integer>() {
144+
Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate));
145+
}
144146

145-
@Override
146-
public Context currentContext() {
147-
return sink.currentContext();
148-
}
147+
private class BufferCoreSubscriber implements CoreSubscriber<Integer> {
149148

150-
@Override
151-
public void onSubscribe(Subscription s) {
152-
s.request(1);
153-
}
149+
private final FluxSink<DataBuffer> sink;
150+
private final DataBuffer dataBuffer;
151+
private final ByteBuffer intermediate;
154152

155-
@Override
156-
public void onNext(Integer bytes) {
153+
BufferCoreSubscriber(FluxSink<DataBuffer> sink, DataBuffer dataBuffer, ByteBuffer intermediate) {
157154

158-
if (isClosed()) {
155+
this.sink = sink;
156+
this.dataBuffer = dataBuffer;
157+
this.intermediate = intermediate;
158+
}
159159

160-
onReadDone();
161-
DataBufferUtils.release(dataBuffer);
162-
Operators.onNextDropped(dataBuffer, sink.currentContext());
163-
return;
164-
}
160+
@Override
161+
public Context currentContext() {
162+
return sink.currentContext();
163+
}
165164

166-
intermediate.flip();
167-
dataBuffer.write(intermediate);
165+
@Override
166+
public void onSubscribe(Subscription s) {
167+
s.request(1);
168+
}
168169

169-
sink.next(dataBuffer);
170+
@Override
171+
public void onNext(Integer bytes) {
170172

171-
try {
172-
if (bytes == -1) {
173-
sink.complete();
174-
}
175-
} finally {
176-
onReadDone();
177-
}
173+
if (isClosed()) {
174+
175+
onReadDone();
176+
DataBufferUtils.release(dataBuffer);
177+
Operators.onNextDropped(dataBuffer, sink.currentContext());
178+
return;
178179
}
179180

180-
@Override
181-
public void onError(Throwable t) {
181+
intermediate.flip();
182+
dataBuffer.write(intermediate);
182183

183-
if (isClosed()) {
184+
sink.next(dataBuffer);
184185

185-
Operators.onErrorDropped(t, sink.currentContext());
186-
return;
186+
try {
187+
if (bytes == -1) {
188+
sink.complete();
187189
}
188-
190+
} finally {
189191
onReadDone();
190-
DataBufferUtils.release(dataBuffer);
191-
Operators.onNextDropped(dataBuffer, sink.currentContext());
192-
sink.error(t);
193192
}
193+
}
194194

195-
@Override
196-
public void onComplete() {
195+
@Override
196+
public void onError(Throwable t) {
197197

198-
if (onShouldRead()) {
199-
emitNext(sink);
200-
}
198+
if (isClosed()) {
199+
200+
Operators.onErrorDropped(t, sink.currentContext());
201+
return;
201202
}
202-
});
203+
204+
onReadDone();
205+
DataBufferUtils.release(dataBuffer);
206+
Operators.onNextDropped(dataBuffer, sink.currentContext());
207+
sink.error(t);
208+
}
209+
210+
@Override
211+
public void onComplete() {
212+
213+
if (onShouldRead()) {
214+
emitNext(sink);
215+
}
216+
}
203217
}
204218
}
205219
}

0 commit comments

Comments
 (0)