29
29
30
30
import org .reactivestreams .Publisher ;
31
31
import org .reactivestreams .Subscription ;
32
-
33
32
import org .springframework .core .io .buffer .DataBuffer ;
34
33
import org .springframework .core .io .buffer .DataBufferFactory ;
35
34
35
+ import com .mongodb .reactivestreams .client .Success ;
36
36
import com .mongodb .reactivestreams .client .gridfs .AsyncInputStream ;
37
37
38
38
/**
@@ -56,34 +56,98 @@ class DataBufferPublisherAdapter {
56
56
static Flux <DataBuffer > createBinaryStream (AsyncInputStream inputStream , DataBufferFactory dataBufferFactory ,
57
57
int bufferSize ) {
58
58
59
- State state = new State (inputStream , dataBufferFactory , bufferSize );
59
+ return Flux .usingWhen (Mono .just (new DelegatingAsyncInputStream (inputStream , dataBufferFactory , bufferSize )),
60
+ DataBufferPublisherAdapter ::doRead , AsyncInputStream ::close , (it , err ) -> it .close (), AsyncInputStream ::close );
61
+ }
62
+
63
+ /**
64
+ * Use an {@link AsyncInputStreamHandler} to read data from the given {@link AsyncInputStream}.
65
+ *
66
+ * @param inputStream the source stream.
67
+ * @return a {@link Flux} emitting data chunks one by one.
68
+ * @since 2.2.1
69
+ */
70
+ private static Flux <DataBuffer > doRead (DelegatingAsyncInputStream inputStream ) {
60
71
61
- return Flux .usingWhen (Mono .just (inputStream ), it -> {
72
+ AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler (inputStream , inputStream .dataBufferFactory ,
73
+ inputStream .bufferSize );
62
74
63
- return Flux .< DataBuffer > create ((sink ) -> {
75
+ return Flux .create ((sink ) -> {
64
76
65
- sink .onDispose (state ::close );
66
- sink .onCancel (state ::close );
77
+ sink .onDispose (streamHandler ::close );
78
+ sink .onCancel (streamHandler ::close );
67
79
68
- sink .onRequest (n -> {
69
- state .request (sink , n );
70
- });
80
+ sink .onRequest (n -> {
81
+ streamHandler .request (sink , n );
71
82
});
72
- }, AsyncInputStream ::close , (it , err ) -> it .close (), AsyncInputStream ::close ) //
73
- .concatMap (Flux ::just , 1 );
83
+ });
84
+ }
85
+
86
+ /**
87
+ * An {@link AsyncInputStream} also holding a {@link DataBufferFactory} and default {@literal bufferSize} for reading
88
+ * from it, delegating operations on the {@link AsyncInputStream} to the reference instance. <br />
89
+ * Used to pass on the {@link AsyncInputStream} and parameters to avoid capturing lambdas.
90
+ *
91
+ * @author Christoph Strobl
92
+ * @since 2.2.1
93
+ */
94
+ private static class DelegatingAsyncInputStream implements AsyncInputStream {
95
+
96
+ private final AsyncInputStream inputStream ;
97
+ private final DataBufferFactory dataBufferFactory ;
98
+ private int bufferSize ;
99
+
100
+ /**
101
+ * @param inputStream the source input stream.
102
+ * @param dataBufferFactory
103
+ * @param bufferSize
104
+ */
105
+ DelegatingAsyncInputStream (AsyncInputStream inputStream , DataBufferFactory dataBufferFactory , int bufferSize ) {
106
+
107
+ this .inputStream = inputStream ;
108
+ this .dataBufferFactory = dataBufferFactory ;
109
+ this .bufferSize = bufferSize ;
110
+ }
111
+
112
+ /*
113
+ * (non-Javadoc)
114
+ * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer)
115
+ */
116
+ @ Override
117
+ public Publisher <Integer > read (ByteBuffer dst ) {
118
+ return inputStream .read (dst );
119
+ }
120
+
121
+ /*
122
+ * (non-Javadoc)
123
+ * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long)
124
+ */
125
+ @ Override
126
+ public Publisher <Long > skip (long bytesToSkip ) {
127
+ return inputStream .skip (bytesToSkip );
128
+ }
129
+
130
+ /*
131
+ * (non-Javadoc)
132
+ * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close()
133
+ */
134
+ @ Override
135
+ public Publisher <Success > close () {
136
+ return inputStream .close ();
137
+ }
74
138
}
75
139
76
140
@ RequiredArgsConstructor
77
- static class State {
141
+ static class AsyncInputStreamHandler {
78
142
79
- private static final AtomicLongFieldUpdater <State > DEMAND = AtomicLongFieldUpdater . newUpdater ( State . class ,
80
- "demand" );
143
+ private static final AtomicLongFieldUpdater <AsyncInputStreamHandler > DEMAND = AtomicLongFieldUpdater
144
+ . newUpdater ( AsyncInputStreamHandler . class , "demand" );
81
145
82
- private static final AtomicIntegerFieldUpdater <State > STATE = AtomicIntegerFieldUpdater . newUpdater ( State . class ,
83
- "state" );
146
+ private static final AtomicIntegerFieldUpdater <AsyncInputStreamHandler > STATE = AtomicIntegerFieldUpdater
147
+ . newUpdater ( AsyncInputStreamHandler . class , "state" );
84
148
85
- private static final AtomicIntegerFieldUpdater <State > READ = AtomicIntegerFieldUpdater . newUpdater ( State . class ,
86
- "read" );
149
+ private static final AtomicIntegerFieldUpdater <AsyncInputStreamHandler > READ = AtomicIntegerFieldUpdater
150
+ . newUpdater ( AsyncInputStreamHandler . class , "read" );
87
151
88
152
private static final int STATE_OPEN = 0 ;
89
153
private static final int STATE_CLOSED = 1 ;
@@ -188,6 +252,7 @@ public Context currentContext() {
188
252
189
253
@ Override
190
254
public void onSubscribe (Subscription s ) {
255
+
191
256
this .subscription = s ;
192
257
s .request (1 );
193
258
}
@@ -203,14 +268,8 @@ public void onNext(Integer bytes) {
203
268
204
269
if (bytes > 0 ) {
205
270
206
- transport .flip ();
207
-
208
- DataBuffer dataBuffer = factory .allocateBuffer (transport .remaining ());
209
- dataBuffer .write (transport );
210
-
211
- transport .clear ();
212
- sink .next (dataBuffer );
213
-
271
+ DataBuffer buffer = readNextChunk ();
272
+ sink .next (buffer );
214
273
decrementDemand ();
215
274
}
216
275
@@ -226,6 +285,18 @@ public void onNext(Integer bytes) {
226
285
subscription .request (1 );
227
286
}
228
287
288
+ private DataBuffer readNextChunk () {
289
+
290
+ transport .flip ();
291
+
292
+ DataBuffer dataBuffer = factory .allocateBuffer (transport .remaining ());
293
+ dataBuffer .write (transport );
294
+
295
+ transport .clear ();
296
+
297
+ return dataBuffer ;
298
+ }
299
+
229
300
@ Override
230
301
public void onError (Throwable t ) {
231
302
0 commit comments