Description
The ResponsePublisher.PublisherAdapter conservatively copy the content of the ByteBuf to a new ByteBuffer to pass to
the subscriber in case the consumption will be async.
Describe the Feature
Use a marker interface or annotation or a configuration option to indicate that subscriber is not async and we can then use the ByteBuf.nioBuffer instead of doing a copy of the buffer. That will reduce allocation and the work needed to download file for non async subscriber.
Is your Feature Request related to a problem?
Proposed Solution
There does not seem to be a way to indicate if the subscription is async or not in the reactive stream api - but I'm not very well verse with that.
@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
boolean isAsync = isAsyncSubscriber(subscriber);
so with an marker interface that could look like that
// assume async unless mark otherwise
public isAsyncSubscriber(Subscriber<?> subscriber) {
return subscriber instanceof SyncSubscriber;
}
the on next method can check an isAsync flag
@Override
public void onNext(HttpContent httpContent) {
// isDone may be true if the subscriber cancelled
if (isDone.get()) {
httpContent.release();
return;
}
// Needed to prevent use-after-free bug if the subscriber's onNext is asynchronous
ByteBuffer byteBuffer =
tryCatchFinally(() -> isAsync? copyToByteBuffer(httpContent.content()) : httpContent.content().nioBuffer() ,
this::onError,
httpContent::release);
//As per reactive-streams rule 2.13, we should not call subscriber#onError when
//exception is thrown from subscriber#onNext
if (byteBuffer != null) {
tryCatch(() -> subscriber.onNext(byteBuffer),
this::notifyError);
}
}
Describe alternatives you've considered
- could use annotation, but the reflection cost would be quite high
- could use as part of the configuration, but that would then apply globally
- could use as part of the GetObjectRequest but that seems pretty far from the code it concern
Additional Context
The goal is to reduce the load, Netty has its own allocator, but we are losing the benefits of that by forcing a copy for the async case.
- I may be able to implement this feature request
Your Environment
- AWS Java SDK version used:
- JDK version used:
- Operating System and version: