From a601f6cbeac6d179419dd86a2e040f4fda7e5609 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 18 Oct 2019 11:46:29 +0200 Subject: [PATCH 1/7] DATAMONGO-2393 - Prepare issue branch. --- pom.xml | 2 +- spring-data-mongodb-benchmarks/pom.xml | 2 +- spring-data-mongodb-distribution/pom.xml | 2 +- spring-data-mongodb/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 15b2d67f47..2f40db764d 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 2.3.0.BUILD-SNAPSHOT + 2.3.0.DATAMONGO-2393-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index c4766040c1..9aa26e3402 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 2.3.0.BUILD-SNAPSHOT + 2.3.0.DATAMONGO-2393-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index ed39c63e76..bc2ff731e3 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -14,7 +14,7 @@ org.springframework.data spring-data-mongodb-parent - 2.3.0.BUILD-SNAPSHOT + 2.3.0.DATAMONGO-2393-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index 25cf02b5d5..bf31d9bc0d 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -11,7 +11,7 @@ org.springframework.data spring-data-mongodb-parent - 2.3.0.BUILD-SNAPSHOT + 2.3.0.DATAMONGO-2393-SNAPSHOT ../pom.xml From 7f7319a3d4c3d5b1c44300300e3b36e08706abc6 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Wed, 16 Oct 2019 11:14:46 +0200 Subject: [PATCH 2/7] DATAMONGO-2393 - Fix test issues related to JUnit5 upgrade. Execution time and test order changed by using JUnit5. This commit fixes some of the issues related to index creation where actually not needed. --- .../data/mongodb/core/MongoTemplateCollationTests.java | 5 +++++ .../data/mongodb/core/MongoTemplateTransactionTests.java | 5 +++++ .../data/mongodb/core/MongoTemplateValidationTests.java | 5 +++++ .../springframework/data/mongodb/core/NoExplicitIdTests.java | 5 +++++ 4 files changed, 20 insertions(+) diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateCollationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateCollationTests.java index 70bf70777c..8950571c0c 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateCollationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateCollationTests.java @@ -60,6 +60,11 @@ public MongoClient mongoClient() { protected String getDatabaseName() { return "collation-tests"; } + + @Override + protected boolean autoIndexCreation() { + return false; + } } @Autowired MongoTemplate template; diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java index 92654379a4..da8ab558ee 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java @@ -85,6 +85,11 @@ protected String getDatabaseName() { return DB_NAME; } + @Override + protected boolean autoIndexCreation() { + return false; + } + @Bean MongoTransactionManager txManager(MongoDbFactory dbFactory) { return new MongoTransactionManager(dbFactory); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateValidationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateValidationTests.java index d0b68acf2e..1983634d06 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateValidationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateValidationTests.java @@ -74,6 +74,11 @@ public MongoClient mongoClient() { protected String getDatabaseName() { return "validation-tests"; } + + @Override + protected boolean autoIndexCreation() { + return false; + } } @Autowired MongoTemplate template; diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/NoExplicitIdTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/NoExplicitIdTests.java index 19a373068e..5e74309c05 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/NoExplicitIdTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/NoExplicitIdTests.java @@ -59,6 +59,11 @@ protected String getDatabaseName() { public MongoClient mongoClient() { return MongoTestUtils.client(); } + + @Override + protected boolean autoIndexCreation() { + return false; + } } @Autowired MongoOperations mongoOps; From d5dcc094ea2facf03312d396b466ef84ff86ace0 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 21 Oct 2019 09:14:29 +0200 Subject: [PATCH 3/7] DATAMONGO-2393 - Use drain loop for same-thread processing in GridFS download stream. We now rely on an outer drain-loop when GridFS reads complete on the same thread instead of using recursive subscriptions to avoid StackOverflow. Previously, we recursively invoked subscriptions that lead to an increased stack size. --- .../gridfs/DataBufferPublisherAdapter.java | 77 ++++++++++++------- 1 file changed, 48 insertions(+), 29 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java index dfeb1411d5..654f9d7f0e 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -32,7 +32,6 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DataBufferUtils; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; @@ -51,11 +50,13 @@ class DataBufferPublisherAdapter { * * @param inputStream must not be {@literal null}. * @param dataBufferFactory must not be {@literal null}. + * @param bufferSize read {@code n} bytes per iteration. * @return the resulting {@link Publisher}. */ - static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, + int bufferSize) { - State state = new State(inputStream, dataBufferFactory); + State state = new State(inputStream, dataBufferFactory, bufferSize); return Flux.usingWhen(Mono.just(inputStream), it -> { @@ -92,6 +93,7 @@ static class State { final AsyncInputStream inputStream; final DataBufferFactory dataBufferFactory; + final int bufferSize; // see DEMAND volatile long demand; @@ -105,8 +107,16 @@ static class State { void request(FluxSink sink, long n) { Operators.addCap(DEMAND, this, n); + drainLoop(sink); + } - if (onShouldRead()) { + /** + * Loops while we have demand and while no read is in progress. + * + * @param sink + */ + void drainLoop(FluxSink sink) { + while (onShouldRead()) { emitNext(sink); } } @@ -119,16 +129,16 @@ boolean onWantRead() { return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS); } - boolean onReadDone() { - return READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE); + void onReadDone() { + READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE); } long getDemand() { return DEMAND.get(this); } - boolean decrementDemand() { - return DEMAND.decrementAndGet(this) > 0; + void decrementDemand() { + DEMAND.decrementAndGet(this); } void close() { @@ -143,15 +153,15 @@ boolean isClosed() { * Emit the next {@link DataBuffer}. * * @param sink + * @return */ - void emitNext(FluxSink sink) { - - DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(); - ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity()); + private void emitNext(FluxSink sink) { + ByteBuffer transport = ByteBuffer.allocate(bufferSize); + BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber(sink, dataBufferFactory, transport); try { - Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate)); - } catch (Exception e) { + inputStream.read(transport).subscribe(bufferCoreSubscriber); + } catch (Throwable e) { sink.error(e); } } @@ -159,14 +169,16 @@ void emitNext(FluxSink sink) { private class BufferCoreSubscriber implements CoreSubscriber { private final FluxSink sink; - private final DataBuffer dataBuffer; - private final ByteBuffer intermediate; + private final DataBufferFactory factory; + private final ByteBuffer transport; + private final Thread subscribeThread = Thread.currentThread(); + private volatile Subscription subscription; - BufferCoreSubscriber(FluxSink sink, DataBuffer dataBuffer, ByteBuffer intermediate) { + BufferCoreSubscriber(FluxSink sink, DataBufferFactory factory, ByteBuffer transport) { this.sink = sink; - this.dataBuffer = dataBuffer; - this.intermediate = intermediate; + this.factory = factory; + this.transport = transport; } @Override @@ -176,6 +188,7 @@ public Context currentContext() { @Override public void onSubscribe(Subscription s) { + this.subscription = s; s.request(1); } @@ -185,24 +198,32 @@ public void onNext(Integer bytes) { if (isClosed()) { onReadDone(); - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, sink.currentContext()); return; } - intermediate.flip(); - dataBuffer.write(intermediate); + if (bytes > 0) { - sink.next(dataBuffer); - decrementDemand(); + transport.flip(); + + DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining()); + dataBuffer.write(transport); + + transport.clear(); + sink.next(dataBuffer); + + decrementDemand(); + } try { if (bytes == -1) { sink.complete(); + return; } } finally { onReadDone(); } + + subscription.request(1); } @Override @@ -215,16 +236,14 @@ public void onError(Throwable t) { } onReadDone(); - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, sink.currentContext()); sink.error(t); } @Override public void onComplete() { - if (onShouldRead()) { - emitNext(sink); + if (subscribeThread != Thread.currentThread()) { + drainLoop(sink); } } } From 67560672f31962efaa380acc93a5cf3e3d4796f6 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 21 Oct 2019 09:17:34 +0200 Subject: [PATCH 4/7] DATAMONGO-2393 - Fix BufferOverflow in GridFS upload. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AsyncInputStreamAdapter now properly splits and buffers incoming DataBuffers according the read requests of AsyncInputStream.read(…) calls. Previously, the adapter used the input buffer size to be used as the output buffer size. A larger DataBuffer than the transfer buffer handed in through read(…) caused a BufferOverflow. --- .../gridfs/AsyncInputStreamAdapter.java | 133 ++++++++++++++---- 1 file changed, 103 insertions(+), 30 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java index 51ea4a1dce..f9e3dc6023 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java @@ -17,6 +17,8 @@ import lombok.RequiredArgsConstructor; import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.util.concurrent.Queues; @@ -25,14 +27,15 @@ import java.nio.ByteBuffer; import java.util.Queue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.function.BiConsumer; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; + import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; import com.mongodb.reactivestreams.client.Success; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; @@ -66,15 +69,16 @@ class AsyncInputStreamAdapter implements AsyncInputStream { private final Publisher buffers; private final Context subscriberContext; - private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); private volatile Subscription subscription; private volatile boolean cancelled; - private volatile boolean complete; + private volatile boolean allDataBuffersReceived; private volatile Throwable error; private final Queue> readRequests = Queues.> small() .get(); + private final Queue bufferQueue = Queues. small().get(); + // see DEMAND volatile long demand; @@ -88,41 +92,75 @@ class AsyncInputStreamAdapter implements AsyncInputStream { @Override public Publisher read(ByteBuffer dst) { - return Mono.create(sink -> { + return Flux.create(sink -> { + AtomicLong written = new AtomicLong(); readRequests.offer((db, bytecount) -> { try { if (error != null) { - - sink.error(error); + onError(sink, error); return; } if (bytecount == -1) { - sink.success(-1); + onComplete(sink, written.get() > 0 ? written.intValue() : -1); return; } ByteBuffer byteBuffer = db.asByteBuffer(); - int toWrite = byteBuffer.remaining(); + int remaining = byteBuffer.remaining(); + int writeCapacity = Math.min(dst.remaining(), remaining); + int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity()); + int toWrite = limit - byteBuffer.position(); + + if (toWrite == 0) { + onComplete(sink, written.intValue()); + return; + } + + int oldPosition = byteBuffer.position(); + + byteBuffer.limit(toWrite); dst.put(byteBuffer); - sink.success(toWrite); + byteBuffer.limit(byteBuffer.capacity()); + byteBuffer.position(oldPosition); + db.readPosition(db.readPosition() + toWrite); + written.addAndGet(toWrite); } catch (Exception e) { - sink.error(e); + onError(sink, e); } finally { - DataBufferUtils.release(db); + + if (db != null && db.readableByteCount() == 0) { + DataBufferUtils.release(db); + } } }); - request(1); + sink.onCancel(this::terminatePendingReads); + sink.onDispose(this::terminatePendingReads); + sink.onRequest(this::request); }); } + void onError(FluxSink sink, Throwable e) { + + readRequests.poll(); + sink.error(e); + } + + void onComplete(FluxSink sink, int writtenBytes) { + + readRequests.poll(); + DEMAND.decrementAndGet(this); + sink.next(writtenBytes); + sink.complete(); + } + /* * (non-Javadoc) * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long) @@ -144,17 +182,19 @@ public Publisher close() { cancelled = true; if (error != null) { + terminatePendingReads(); sink.error(error); return; } + terminatePendingReads(); sink.success(Success.SUCCESS); }); } - protected void request(int n) { + protected void request(long n) { - if (complete) { + if (allDataBuffersReceived && bufferQueue.isEmpty()) { terminatePendingReads(); return; @@ -176,18 +216,51 @@ protected void request(int n) { requestFromSubscription(subscription); } } + } void requestFromSubscription(Subscription subscription) { - long demand = DEMAND.get(AsyncInputStreamAdapter.this); - if (cancelled) { subscription.cancel(); } - if (demand > 0 && DEMAND.compareAndSet(AsyncInputStreamAdapter.this, demand, demand - 1)) { - subscription.request(1); + drainLoop(); + } + + void drainLoop() { + + while (DEMAND.get(AsyncInputStreamAdapter.this) > 0) { + + DataBuffer wip = bufferQueue.peek(); + + if (wip == null) { + break; + } + + if (wip.readableByteCount() == 0) { + bufferQueue.poll(); + continue; + } + + BiConsumer consumer = AsyncInputStreamAdapter.this.readRequests.peek(); + if (consumer == null) { + break; + } + + consumer.accept(wip, wip.readableByteCount()); + } + + if (bufferQueue.isEmpty()) { + + if (allDataBuffersReceived) { + terminatePendingReads(); + return; + } + + if (demand > 0) { + subscription.request(1); + } } } @@ -199,7 +272,7 @@ void terminatePendingReads() { BiConsumer readers; while ((readers = readRequests.poll()) != null) { - readers.accept(factory.wrap(new byte[0]), -1); + readers.accept(null, -1); } } @@ -214,23 +287,21 @@ public Context currentContext() { public void onSubscribe(Subscription s) { AsyncInputStreamAdapter.this.subscription = s; - - Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1); s.request(1); } @Override public void onNext(DataBuffer dataBuffer) { - if (cancelled || complete) { + if (cancelled || allDataBuffersReceived) { DataBufferUtils.release(dataBuffer); Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext); return; } - BiConsumer poll = AsyncInputStreamAdapter.this.readRequests.poll(); + BiConsumer readRequest = AsyncInputStreamAdapter.this.readRequests.peek(); - if (poll == null) { + if (readRequest == null) { DataBufferUtils.release(dataBuffer); Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext); @@ -238,29 +309,31 @@ public void onNext(DataBuffer dataBuffer) { return; } - poll.accept(dataBuffer, dataBuffer.readableByteCount()); + bufferQueue.offer(dataBuffer); - requestFromSubscription(subscription); + drainLoop(); } @Override public void onError(Throwable t) { - if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.complete) { + if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.allDataBuffersReceived) { Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext); return; } AsyncInputStreamAdapter.this.error = t; - AsyncInputStreamAdapter.this.complete = true; + AsyncInputStreamAdapter.this.allDataBuffersReceived = true; terminatePendingReads(); } @Override public void onComplete() { - AsyncInputStreamAdapter.this.complete = true; - terminatePendingReads(); + AsyncInputStreamAdapter.this.allDataBuffersReceived = true; + if (bufferQueue.isEmpty()) { + terminatePendingReads(); + } } } } From 19e1b80f6285ba0c701df4bfdb76be757134d419 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 21 Oct 2019 09:07:30 +0200 Subject: [PATCH 5/7] DATAMONGO-2393 - Polishing. Extract read requests into inner class. --- .../gridfs/AsyncInputStreamAdapter.java | 134 +++++++++++------- 1 file changed, 79 insertions(+), 55 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java index f9e3dc6023..ff2a33bf57 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java @@ -27,9 +27,7 @@ import java.nio.ByteBuffer; import java.util.Queue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.function.BiConsumer; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -74,8 +72,7 @@ class AsyncInputStreamAdapter implements AsyncInputStream { private volatile boolean cancelled; private volatile boolean allDataBuffersReceived; private volatile Throwable error; - private final Queue> readRequests = Queues.> small() - .get(); + private final Queue readRequests = Queues. small().get(); private final Queue bufferQueue = Queues. small().get(); @@ -94,52 +91,7 @@ public Publisher read(ByteBuffer dst) { return Flux.create(sink -> { - AtomicLong written = new AtomicLong(); - readRequests.offer((db, bytecount) -> { - - try { - - if (error != null) { - onError(sink, error); - return; - } - - if (bytecount == -1) { - - onComplete(sink, written.get() > 0 ? written.intValue() : -1); - return; - } - - ByteBuffer byteBuffer = db.asByteBuffer(); - int remaining = byteBuffer.remaining(); - int writeCapacity = Math.min(dst.remaining(), remaining); - int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity()); - int toWrite = limit - byteBuffer.position(); - - if (toWrite == 0) { - - onComplete(sink, written.intValue()); - return; - } - - int oldPosition = byteBuffer.position(); - - byteBuffer.limit(toWrite); - dst.put(byteBuffer); - byteBuffer.limit(byteBuffer.capacity()); - byteBuffer.position(oldPosition); - db.readPosition(db.readPosition() + toWrite); - written.addAndGet(toWrite); - - } catch (Exception e) { - onError(sink, e); - } finally { - - if (db != null && db.readableByteCount() == 0) { - DataBufferUtils.release(db); - } - } - }); + readRequests.offer(new ReadRequest(sink, dst)); sink.onCancel(this::terminatePendingReads); sink.onDispose(this::terminatePendingReads); @@ -243,12 +195,12 @@ void drainLoop() { continue; } - BiConsumer consumer = AsyncInputStreamAdapter.this.readRequests.peek(); + ReadRequest consumer = AsyncInputStreamAdapter.this.readRequests.peek(); if (consumer == null) { break; } - consumer.accept(wip, wip.readableByteCount()); + consumer.transferBytes(wip, wip.readableByteCount()); } if (bufferQueue.isEmpty()) { @@ -269,10 +221,10 @@ void drainLoop() { */ void terminatePendingReads() { - BiConsumer readers; + ReadRequest readers; while ((readers = readRequests.poll()) != null) { - readers.accept(null, -1); + readers.onComplete(); } } @@ -299,7 +251,7 @@ public void onNext(DataBuffer dataBuffer) { return; } - BiConsumer readRequest = AsyncInputStreamAdapter.this.readRequests.peek(); + ReadRequest readRequest = AsyncInputStreamAdapter.this.readRequests.peek(); if (readRequest == null) { @@ -336,4 +288,76 @@ public void onComplete() { } } } + + /** + * Request to read bytes and transfer these to the associated {@link ByteBuffer}. + */ + class ReadRequest { + + private final FluxSink sink; + private final ByteBuffer dst; + + private int writtenBytes; + + ReadRequest(FluxSink sink, ByteBuffer dst) { + this.sink = sink; + this.dst = dst; + this.writtenBytes = -1; + } + + public void onComplete() { + + if (error != null) { + AsyncInputStreamAdapter.this.onError(sink, error); + return; + } + + AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes); + } + + public void transferBytes(DataBuffer db, int bytes) { + + try { + + if (error != null) { + AsyncInputStreamAdapter.this.onError(sink, error); + return; + } + + ByteBuffer byteBuffer = db.asByteBuffer(); + int remaining = byteBuffer.remaining(); + int writeCapacity = Math.min(dst.remaining(), remaining); + int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity()); + int toWrite = limit - byteBuffer.position(); + + if (toWrite == 0) { + + AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes); + return; + } + + int oldPosition = byteBuffer.position(); + + byteBuffer.limit(toWrite); + dst.put(byteBuffer); + byteBuffer.limit(byteBuffer.capacity()); + byteBuffer.position(oldPosition); + db.readPosition(db.readPosition() + toWrite); + + if (writtenBytes == -1) { + writtenBytes = bytes; + } else { + writtenBytes += bytes; + } + + } catch (Exception e) { + AsyncInputStreamAdapter.this.onError(sink, e); + } finally { + + if (db.readableByteCount() == 0) { + DataBufferUtils.release(db); + } + } + } + } } From 14517045c7b3e70b4a4bbcc165bf538d9b80bb72 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 21 Oct 2019 09:10:09 +0200 Subject: [PATCH 6/7] DATAMONGO-2393 - Support configurable chunk size. We now allow consuming GridFS files using a configurable chunk size. The default chunk size is now 256kb. --- .../mongodb/gridfs/BinaryStreamAdapters.java | 6 ++- .../gridfs/ReactiveGridFsResource.java | 35 +++++++++++--- .../gridfs/ReactiveGridFsTemplate.java | 6 ++- .../gridfs/BinaryStreamAdaptersUnitTests.java | 3 +- .../DataBufferPublisherAdapterUnitTests.java | 2 +- .../gridfs/ReactiveGridFsTemplateTests.java | 48 +++++++++++++++++++ 6 files changed, 88 insertions(+), 12 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java index 0bacda78a2..0bb0b1fdde 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java @@ -42,12 +42,14 @@ class BinaryStreamAdapters { * * @param inputStream must not be {@literal null}. * @param dataBufferFactory must not be {@literal null}. + * @param bufferSize read {@code n} bytes per iteration. * @return {@link Flux} emitting {@link DataBuffer}s. * @see DataBufferFactory#allocateBuffer() */ - static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, + int bufferSize) { - return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) // + return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory, bufferSize) // .filter(it -> { if (it.readableByteCount() == 0) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java index 4946d6d0d5..d67aaa3f19 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java @@ -20,6 +20,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.util.function.IntFunction; import org.reactivestreams.Publisher; @@ -41,20 +42,19 @@ public class ReactiveGridFsResource extends AbstractResource { private final @Nullable GridFSFile file; private final String filename; - private final Flux content; + private final IntFunction> contentFunction; /** * Creates a new, absent {@link ReactiveGridFsResource}. * * @param filename filename of the absent resource. * @param content - * @since 2.1 */ private ReactiveGridFsResource(String filename, Publisher content) { this.file = null; this.filename = filename; - this.content = Flux.from(content); + this.contentFunction = any -> Flux.from(content); } /** @@ -64,10 +64,21 @@ private ReactiveGridFsResource(String filename, Publisher content) { * @param content */ public ReactiveGridFsResource(GridFSFile file, Publisher content) { + this(file, (IntFunction>) any -> Flux.from(content)); + } + + /** + * Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}. + * + * @param file must not be {@literal null}. + * @param contentFunction + * @since 2.2.1 + */ + ReactiveGridFsResource(GridFSFile file, IntFunction> contentFunction) { this.file = file; this.filename = file.getFilename(); - this.content = Flux.from(content); + this.contentFunction = contentFunction; } /** @@ -165,16 +176,28 @@ public GridFSFile getGridFSFile() { } /** - * Retrieve the download stream. + * Retrieve the download stream using the default chunk size of 256kb. * * @return */ public Flux getDownloadStream() { + return getDownloadStream(256 * 1024); // 256kb buffers + } + + /** + * Retrieve the download stream. + * + * @param chunkSize chunk size in bytes to use. + * @return + * @since 2.2.1 + */ + public Flux getDownloadStream(int chunkSize) { if (!exists()) { return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription()))); } - return this.content; + + return contentFunction.apply(chunkSize); } private void verifyExists() throws FileNotFoundException { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java index fcdf77df99..cec6937630 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java @@ -223,9 +223,11 @@ public Mono getResource(GridFSFile file) { return Mono.fromSupplier(() -> { - GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId()); + return new ReactiveGridFsResource(file, chunkSize -> { - return new ReactiveGridFsResource(file, BinaryStreamAdapters.toPublisher(stream, dataBufferFactory)); + GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId()); + return BinaryStreamAdapters.toPublisher(stream, dataBufferFactory, chunkSize); + }); }); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java index 9c17d778f6..7d46d4c468 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import org.junit.Test; + import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; @@ -49,7 +50,7 @@ public void shouldAdaptAsyncInputStreamToDataBufferPublisher() throws IOExceptio byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); - Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory()); + Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory(), 256); DataBufferUtils.join(dataBuffers) // .as(StepVerifier::create) // diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java index e89d96676e..26eaebc6c2 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java @@ -44,7 +44,7 @@ public void adapterShouldPropagateErrors() { when(asyncInput.read(any())).thenReturn(Mono.just(1), Mono.error(new IllegalStateException())); when(asyncInput.close()).thenReturn(Mono.empty()); - Flux binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory); + Flux binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory, 256); StepVerifier.create(binaryStream, 0) // .thenRequest(1) // diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java index cc808339fc..a95976b7d0 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java @@ -24,6 +24,7 @@ import reactor.test.StepVerifier; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.UUID; import org.bson.BsonObjectId; @@ -40,7 +41,9 @@ import org.springframework.core.io.buffer.DefaultDataBuffer; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.dao.IncorrectResultSizeDataAccessException; +import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; +import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.query.Query; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; @@ -48,6 +51,7 @@ import com.mongodb.gridfs.GridFS; import com.mongodb.gridfs.GridFSInputFile; +import com.mongodb.internal.HexUtils; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper; @@ -66,6 +70,8 @@ public class ReactiveGridFsTemplateTests { @Autowired ReactiveGridFsOperations operations; @Autowired SimpleMongoDbFactory mongoClient; + @Autowired ReactiveMongoDatabaseFactory dbFactory; + @Autowired MongoConverter mongoConverter; @Before public void setUp() { @@ -92,6 +98,48 @@ public void storesAndFindsSimpleDocument() { .verifyComplete(); } + @Test // DATAMONGO-1855 + public void storesAndLoadsLargeFileCorrectly() { + + ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000 * 1); // 1 mb + + int i = 0; + while (buffer.remaining() != 0) { + byte b = (byte) (i++ % 16); + String string = HexUtils.toHex(new byte[] { b }); + buffer.put(string.getBytes()); + } + buffer.flip(); + + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + + ObjectId reference = operations.store(Flux.just(factory.wrap(buffer)), "large.txt").block(); + + buffer.clear(); + + // default chunk size + operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource) + .flatMapMany(ReactiveGridFsResource::getDownloadStream) // + .transform(DataBufferUtils::join) // + .as(StepVerifier::create) // + .consumeNextWith(dataBuffer -> { + + assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining()); + assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer); + }).verifyComplete(); + + // small chunk size + operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource) + .flatMapMany(reactiveGridFsResource -> reactiveGridFsResource.getDownloadStream(256)) // + .transform(DataBufferUtils::join) // + .as(StepVerifier::create) // + .consumeNextWith(dataBuffer -> { + + assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining()); + assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer); + }).verifyComplete(); + } + @Test // DATAMONGO-2392 public void storesAndFindsByUUID() throws IOException { From c952a0db147472c0adb5ba6443cab72634d4ba8e Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Wed, 23 Oct 2019 09:59:28 +0200 Subject: [PATCH 7/7] DATAMONGO-2393 - Remove capturing lambdas and extract methods. --- .../gridfs/DataBufferPublisherAdapter.java | 123 ++++++++++++++---- .../gridfs/ReactiveGridFsResource.java | 16 ++- .../gridfs/ReactiveGridFsTemplateTests.java | 8 +- 3 files changed, 110 insertions(+), 37 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java index 654f9d7f0e..180778c93b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -29,10 +29,10 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; - import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import com.mongodb.reactivestreams.client.Success; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; /** @@ -56,34 +56,98 @@ class DataBufferPublisherAdapter { static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) { - State state = new State(inputStream, dataBufferFactory, bufferSize); + return Flux.usingWhen(Mono.just(new DelegatingAsyncInputStream(inputStream, dataBufferFactory, bufferSize)), + DataBufferPublisherAdapter::doRead, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close); + } + + /** + * Use an {@link AsyncInputStreamHandler} to read data from the given {@link AsyncInputStream}. + * + * @param inputStream the source stream. + * @return a {@link Flux} emitting data chunks one by one. + * @since 2.2.1 + */ + private static Flux doRead(DelegatingAsyncInputStream inputStream) { - return Flux.usingWhen(Mono.just(inputStream), it -> { + AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler(inputStream, inputStream.dataBufferFactory, + inputStream.bufferSize); - return Flux. create((sink) -> { + return Flux.create((sink) -> { - sink.onDispose(state::close); - sink.onCancel(state::close); + sink.onDispose(streamHandler::close); + sink.onCancel(streamHandler::close); - sink.onRequest(n -> { - state.request(sink, n); - }); + sink.onRequest(n -> { + streamHandler.request(sink, n); }); - }, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close) // - .concatMap(Flux::just, 1); + }); + } + + /** + * An {@link AsyncInputStream} also holding a {@link DataBufferFactory} and default {@literal bufferSize} for reading + * from it, delegating operations on the {@link AsyncInputStream} to the reference instance.
+ * Used to pass on the {@link AsyncInputStream} and parameters to avoid capturing lambdas. + * + * @author Christoph Strobl + * @since 2.2.1 + */ + private static class DelegatingAsyncInputStream implements AsyncInputStream { + + private final AsyncInputStream inputStream; + private final DataBufferFactory dataBufferFactory; + private int bufferSize; + + /** + * @param inputStream the source input stream. + * @param dataBufferFactory + * @param bufferSize + */ + DelegatingAsyncInputStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) { + + this.inputStream = inputStream; + this.dataBufferFactory = dataBufferFactory; + this.bufferSize = bufferSize; + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer) + */ + @Override + public Publisher read(ByteBuffer dst) { + return inputStream.read(dst); + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long) + */ + @Override + public Publisher skip(long bytesToSkip) { + return inputStream.skip(bytesToSkip); + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close() + */ + @Override + public Publisher close() { + return inputStream.close(); + } } @RequiredArgsConstructor - static class State { + static class AsyncInputStreamHandler { - private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, - "demand"); + private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater + .newUpdater(AsyncInputStreamHandler.class, "demand"); - private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, - "state"); + private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater + .newUpdater(AsyncInputStreamHandler.class, "state"); - private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater.newUpdater(State.class, - "read"); + private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater + .newUpdater(AsyncInputStreamHandler.class, "read"); private static final int STATE_OPEN = 0; private static final int STATE_CLOSED = 1; @@ -188,6 +252,7 @@ public Context currentContext() { @Override public void onSubscribe(Subscription s) { + this.subscription = s; s.request(1); } @@ -203,14 +268,8 @@ public void onNext(Integer bytes) { if (bytes > 0) { - transport.flip(); - - DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining()); - dataBuffer.write(transport); - - transport.clear(); - sink.next(dataBuffer); - + DataBuffer buffer = readNextChunk(); + sink.next(buffer); decrementDemand(); } @@ -226,6 +285,18 @@ public void onNext(Integer bytes) { subscription.request(1); } + private DataBuffer readNextChunk() { + + transport.flip(); + + DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining()); + dataBuffer.write(transport); + + transport.clear(); + + return dataBuffer; + } + @Override public void onError(Throwable t) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java index d67aaa3f19..fb40dad10c 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java @@ -23,7 +23,6 @@ import java.util.function.IntFunction; import org.reactivestreams.Publisher; - import org.springframework.core.io.AbstractResource; import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; @@ -36,10 +35,13 @@ * Reactive {@link GridFSFile} based {@link Resource} implementation. * * @author Mark Paluch + * @author Christoph Strobl * @since 2.2 */ public class ReactiveGridFsResource extends AbstractResource { + private static final Integer DEFAULT_CHUNK_SIZE = 256 * 1024; + private final @Nullable GridFSFile file; private final String filename; private final IntFunction> contentFunction; @@ -176,19 +178,23 @@ public GridFSFile getGridFSFile() { } /** - * Retrieve the download stream using the default chunk size of 256kb. + * Retrieve the download stream using the default chunk size of 256 kB. * - * @return + * @return a {@link Flux} emitting data chunks one by one. Please make sure to + * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all + * {@link DataBuffer buffers} when done. */ public Flux getDownloadStream() { - return getDownloadStream(256 * 1024); // 256kb buffers + return getDownloadStream(DEFAULT_CHUNK_SIZE); } /** * Retrieve the download stream. * * @param chunkSize chunk size in bytes to use. - * @return + * @return a {@link Flux} emitting data chunks one by one. Please make sure to + * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all + * {@link DataBuffer buffers} when done. * @since 2.2.1 */ public Flux getDownloadStream(int chunkSize) { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java index a95976b7d0..9914443c8d 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java @@ -33,7 +33,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; @@ -101,13 +100,10 @@ public void storesAndFindsSimpleDocument() { @Test // DATAMONGO-1855 public void storesAndLoadsLargeFileCorrectly() { - ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000 * 1); // 1 mb - + ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000); // 1 mb int i = 0; while (buffer.remaining() != 0) { - byte b = (byte) (i++ % 16); - String string = HexUtils.toHex(new byte[] { b }); - buffer.put(string.getBytes()); + buffer.put(HexUtils.toHex(new byte[] { (byte) (i++ % 16) }).getBytes()); } buffer.flip();