Skip to content

Commit 26d800c

Browse files
committed
Fix empty payload handling in RSocketRequester
Closes gh-24088
1 parent 5a552f1 commit 26d800c

File tree

2 files changed

+45
-35
lines changed

2 files changed

+45
-35
lines changed

spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ final class DefaultRSocketRequester implements RSocketRequester {
5757

5858
private final RSocketStrategies strategies;
5959

60-
private final DataBuffer emptyDataBuffer;
60+
private final Mono<DataBuffer> emptyBufferMono;
6161

6262

6363
DefaultRSocketRequester(
@@ -73,7 +73,7 @@ final class DefaultRSocketRequester implements RSocketRequester {
7373
this.dataMimeType = dataMimeType;
7474
this.metadataMimeType = metadataMimeType;
7575
this.strategies = strategies;
76-
this.emptyDataBuffer = this.strategies.dataBufferFactory().wrap(new byte[0]);
76+
this.emptyBufferMono = Mono.just(this.strategies.dataBufferFactory().wrap(new byte[0]));
7777
}
7878

7979

@@ -193,7 +193,7 @@ else if (adapter != null) {
193193
}
194194

195195
if (isVoid(elementType) || (adapter != null && adapter.isNoValue())) {
196-
this.payloadMono = firstPayload(Mono.when(publisher).then(Mono.just(emptyDataBuffer)));
196+
this.payloadMono = Mono.when(publisher).then(firstPayload(emptyBufferMono));
197197
this.payloadFlux = null;
198198
return;
199199
}
@@ -204,7 +204,7 @@ else if (adapter != null) {
204204
if (adapter != null && !adapter.isMultiValue()) {
205205
Mono<DataBuffer> data = Mono.from(publisher)
206206
.map(value -> encodeData(value, elementType, encoder))
207-
.defaultIfEmpty(emptyDataBuffer);
207+
.switchIfEmpty(emptyBufferMono);
208208
this.payloadMono = firstPayload(data);
209209
this.payloadFlux = null;
210210
return;
@@ -213,7 +213,7 @@ else if (adapter != null) {
213213
this.payloadMono = null;
214214
this.payloadFlux = Flux.from(publisher)
215215
.map(value -> encodeData(value, elementType, encoder))
216-
.defaultIfEmpty(emptyDataBuffer)
216+
.switchIfEmpty(emptyBufferMono)
217217
.switchOnFirst((signal, inner) -> {
218218
DataBuffer data = signal.get();
219219
if (data != null) {
@@ -250,12 +250,7 @@ private Mono<Payload> firstPayload(Mono<DataBuffer> encodedData) {
250250

251251
@Override
252252
public Mono<Void> send() {
253-
return getPayloadMonoRequired().flatMap(rsocket::fireAndForget);
254-
}
255-
256-
private Mono<Payload> getPayloadMonoRequired() {
257-
Assert.state(this.payloadFlux == null, "No RSocket interaction model for Flux request to Mono response.");
258-
return this.payloadMono != null ? this.payloadMono : firstPayload(Mono.just(emptyDataBuffer));
253+
return getPayloadMono().flatMap(rsocket::fireAndForget);
259254
}
260255

261256
@Override
@@ -268,19 +263,9 @@ public <T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef) {
268263
return retrieveMono(ResolvableType.forType(dataTypeRef));
269264
}
270265

271-
@Override
272-
public <T> Flux<T> retrieveFlux(Class<T> dataType) {
273-
return retrieveFlux(ResolvableType.forClass(dataType));
274-
}
275-
276-
@Override
277-
public <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef) {
278-
return retrieveFlux(ResolvableType.forType(dataTypeRef));
279-
}
280-
281266
@SuppressWarnings("unchecked")
282267
private <T> Mono<T> retrieveMono(ResolvableType elementType) {
283-
Mono<Payload> payloadMono = getPayloadMonoRequired().flatMap(rsocket::requestResponse);
268+
Mono<Payload> payloadMono = getPayloadMono().flatMap(rsocket::requestResponse);
284269

285270
if (isVoid(elementType)) {
286271
return (Mono<T>) payloadMono.then();
@@ -291,11 +276,22 @@ private <T> Mono<T> retrieveMono(ResolvableType elementType) {
291276
.map(dataBuffer -> decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
292277
}
293278

279+
@Override
280+
public <T> Flux<T> retrieveFlux(Class<T> dataType) {
281+
return retrieveFlux(ResolvableType.forClass(dataType));
282+
}
283+
284+
@Override
285+
public <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef) {
286+
return retrieveFlux(ResolvableType.forType(dataTypeRef));
287+
}
288+
294289
@SuppressWarnings("unchecked")
295290
private <T> Flux<T> retrieveFlux(ResolvableType elementType) {
296-
Flux<Payload> payloadFlux = this.payloadMono != null ?
297-
this.payloadMono.flatMapMany(rsocket::requestStream) :
298-
rsocket.requestChannel(this.payloadFlux);
291+
292+
Flux<Payload> payloadFlux = (this.payloadFlux != null ?
293+
rsocket.requestChannel(this.payloadFlux) :
294+
getPayloadMono().flatMapMany(rsocket::requestStream));
299295

300296
if (isVoid(elementType)) {
301297
return payloadFlux.thenMany(Flux.empty());
@@ -306,6 +302,11 @@ private <T> Flux<T> retrieveFlux(ResolvableType elementType) {
306302
(T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
307303
}
308304

305+
private Mono<Payload> getPayloadMono() {
306+
Assert.state(this.payloadFlux == null, "No RSocket interaction with Flux request and Mono response.");
307+
return this.payloadMono != null ? this.payloadMono : firstPayload(emptyBufferMono);
308+
}
309+
309310
private DataBuffer retainDataAndReleasePayload(Payload payload) {
310311
return PayloadUtils.retainDataAndReleasePayload(payload, bufferFactory());
311312
}

spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,15 +145,6 @@ public void sendWithoutData() {
145145
assertThat(this.rsocket.getSavedPayload().getDataUtf8()).isEqualTo("");
146146
}
147147

148-
@Test
149-
public void sendMonoWithoutData() {
150-
this.requester.route("toA").retrieveMono(String.class).block(Duration.ofSeconds(5));
151-
152-
assertThat(this.rsocket.getSavedMethodName()).isEqualTo("requestResponse");
153-
assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("toA");
154-
assertThat(this.rsocket.getSavedPayload().getDataUtf8()).isEqualTo("");
155-
}
156-
157148
@Test
158149
public void testSendWithAsyncMetadata() {
159150

@@ -205,6 +196,15 @@ public void retrieveMonoVoid() {
205196
assertThat(this.rsocket.getSavedMethodName()).isEqualTo("requestResponse");
206197
}
207198

199+
@Test
200+
public void retrieveMonoWithoutData() {
201+
this.requester.route("toA").retrieveMono(String.class).block(Duration.ofSeconds(5));
202+
203+
assertThat(this.rsocket.getSavedMethodName()).isEqualTo("requestResponse");
204+
assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("toA");
205+
assertThat(this.rsocket.getSavedPayload().getDataUtf8()).isEqualTo("");
206+
}
207+
208208
@Test
209209
public void retrieveFlux() {
210210
String[] values = new String[] {"bodyA", "bodyB", "bodyC"};
@@ -227,11 +227,20 @@ public void retrieveFluxVoid() {
227227
assertThat(this.rsocket.getSavedMethodName()).isEqualTo("requestStream");
228228
}
229229

230+
@Test
231+
public void retrieveFluxWithoutData() {
232+
this.requester.route("toA").retrieveFlux(String.class).blockLast(Duration.ofSeconds(5));
233+
234+
assertThat(this.rsocket.getSavedMethodName()).isEqualTo("requestStream");
235+
assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("toA");
236+
assertThat(this.rsocket.getSavedPayload().getDataUtf8()).isEqualTo("");
237+
}
238+
230239
@Test
231240
public void fluxToMonoIsRejected() {
232241
assertThatIllegalStateException()
233242
.isThrownBy(() -> this.requester.route("").data(Flux.just("a", "b")).retrieveMono(String.class))
234-
.withMessage("No RSocket interaction model for Flux request to Mono response.");
243+
.withMessage("No RSocket interaction with Flux request and Mono response.");
235244
}
236245

237246
private Payload toPayload(String value) {

0 commit comments

Comments
 (0)