Skip to content

Commit cbc5746

Browse files
committed
Support for maxInMemorySize in SSE reader
Closes gh-24312
1 parent a741ae4 commit cbc5746

File tree

5 files changed

+138
-26
lines changed

5 files changed

+138
-26
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferLimitException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
* This can be raised when data buffers are cached and aggregated, e.g.
2222
* {@link DataBufferUtils#join}. Or it could also be raised when data buffers
2323
* have been released but a parsed representation is being aggregated, e.g. async
24-
* parsing with Jackson.
24+
* parsing with Jackson, SSE parsing and aggregating lines per event.
2525
*
2626
* @author Rossen Stoyanchev
2727
* @since 5.1.11

spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
import org.springframework.core.codec.StringDecoder;
3232
import org.springframework.core.io.buffer.DataBuffer;
3333
import org.springframework.core.io.buffer.DataBufferFactory;
34+
import org.springframework.core.io.buffer.DataBufferLimitException;
3435
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
3536
import org.springframework.http.MediaType;
3637
import org.springframework.http.ReactiveHttpInputMessage;
@@ -48,14 +49,16 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
4849

4950
private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
5051

51-
private static final StringDecoder stringDecoder = StringDecoder.textPlainOnly();
52-
5352
private static final ResolvableType STRING_TYPE = ResolvableType.forClass(String.class);
5453

5554

5655
@Nullable
5756
private final Decoder<?> decoder;
5857

58+
private final StringDecoder lineDecoder = StringDecoder.textPlainOnly();
59+
60+
61+
5962

6063
/**
6164
* Constructor without a {@code Decoder}. In this mode only {@code String}
@@ -82,6 +85,29 @@ public Decoder<?> getDecoder() {
8285
return this.decoder;
8386
}
8487

88+
/**
89+
* Configure a limit on the maximum number of bytes per SSE event which are
90+
* buffered before the event is parsed.
91+
* <p>Note that the {@link #getDecoder() data decoder}, if provided, must
92+
* also be customized accordingly to raise the limit if necessary in order
93+
* to be able to parse the data portion of the event.
94+
* <p>By default this is set to 256K.
95+
* @param byteCount the max number of bytes to buffer, or -1 for unlimited
96+
* @since 5.1.13
97+
*/
98+
public void setMaxInMemorySize(int byteCount) {
99+
this.lineDecoder.setMaxInMemorySize(byteCount);
100+
}
101+
102+
/**
103+
* Return the {@link #setMaxInMemorySize configured} byte count limit.
104+
* @since 5.1.13
105+
*/
106+
public int getMaxInMemorySize() {
107+
return this.lineDecoder.getMaxInMemorySize();
108+
}
109+
110+
85111
@Override
86112
public List<MediaType> getReadableMediaTypes() {
87113
return Collections.singletonList(MediaType.TEXT_EVENT_STREAM);
@@ -101,12 +127,15 @@ private boolean isServerSentEvent(ResolvableType elementType) {
101127
public Flux<Object> read(
102128
ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
103129

130+
LimitTracker limitTracker = new LimitTracker();
131+
104132
boolean shouldWrap = isServerSentEvent(elementType);
105133
ResolvableType valueType = (shouldWrap ? elementType.getGeneric() : elementType);
106134

107-
return stringDecoder.decode(message.getBody(), STRING_TYPE, null, hints)
135+
return this.lineDecoder.decode(message.getBody(), STRING_TYPE, null, hints)
136+
.doOnNext(limitTracker::afterLineParsed)
108137
.bufferUntil(String::isEmpty)
109-
.concatMap(lines -> Mono.justOrEmpty(buildEvent(lines, valueType, shouldWrap, hints)));
138+
.map(lines -> buildEvent(lines, valueType, shouldWrap, hints));
110139
}
111140

112141
@Nullable
@@ -172,16 +201,47 @@ private Object decodeData(String data, ResolvableType dataType, Map<String, Obje
172201
public Mono<Object> readMono(
173202
ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
174203

175-
// We're ahead of String + "*/*"
176-
// Let's see if we can aggregate the output (lest we time out)...
204+
// In order of readers, we're ahead of String + "*/*"
205+
// If this is called, simply delegate to StringDecoder
177206

178207
if (elementType.resolve() == String.class) {
179208
Flux<DataBuffer> body = message.getBody();
180-
return stringDecoder.decodeToMono(body, elementType, null, null).cast(Object.class);
209+
return this.lineDecoder.decodeToMono(body, elementType, null, null).cast(Object.class);
181210
}
182211

183212
return Mono.error(new UnsupportedOperationException(
184213
"ServerSentEventHttpMessageReader only supports reading stream of events as a Flux"));
185214
}
186215

216+
217+
private class LimitTracker {
218+
219+
private int accumulated = 0;
220+
221+
222+
public void afterLineParsed(String line) {
223+
if (getMaxInMemorySize() < 0) {
224+
return;
225+
}
226+
if (line.isEmpty()) {
227+
this.accumulated = 0;
228+
}
229+
if (line.length() > Integer.MAX_VALUE - this.accumulated) {
230+
raiseLimitException();
231+
}
232+
else {
233+
this.accumulated += line.length();
234+
if (this.accumulated > getMaxInMemorySize()) {
235+
raiseLimitException();
236+
}
237+
}
238+
}
239+
240+
private void raiseLimitException() {
241+
// Do not release here, it's likely down via doOnDiscard..
242+
throw new DataBufferLimitException(
243+
"Exceeded limit on max bytes to buffer : " + getMaxInMemorySize());
244+
}
245+
}
246+
187247
}

spring-web/src/main/java/org/springframework/http/codec/support/BaseDefaultCodecs.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -238,9 +238,6 @@ private void initCodec(@Nullable Object codec) {
238238
if (codec instanceof DecoderHttpMessageReader) {
239239
codec = ((DecoderHttpMessageReader) codec).getDecoder();
240240
}
241-
else if (codec instanceof ServerSentEventHttpMessageReader) {
242-
codec = ((ServerSentEventHttpMessageReader) codec).getDecoder();
243-
}
244241

245242
if (codec == null) {
246243
return;
@@ -269,6 +266,10 @@ else if (codec instanceof ServerSentEventHttpMessageReader) {
269266
if (codec instanceof FormHttpMessageReader) {
270267
((FormHttpMessageReader) codec).setMaxInMemorySize(size);
271268
}
269+
if (codec instanceof ServerSentEventHttpMessageReader) {
270+
((ServerSentEventHttpMessageReader) codec).setMaxInMemorySize(size);
271+
initCodec(((ServerSentEventHttpMessageReader) codec).getDecoder());
272+
}
272273
if (synchronossMultipartPresent) {
273274
if (codec instanceof SynchronossPartHttpMessageReader) {
274275
((SynchronossPartHttpMessageReader) codec).setMaxInMemorySize(size);

spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageReaderTests.java

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727

2828
import org.springframework.core.ResolvableType;
2929
import org.springframework.core.io.buffer.DataBuffer;
30+
import org.springframework.core.io.buffer.DataBufferLimitException;
3031
import org.springframework.core.testfixture.io.buffer.AbstractLeakCheckingTests;
3132
import org.springframework.http.MediaType;
3233
import org.springframework.http.codec.json.Jackson2JsonDecoder;
@@ -42,20 +43,21 @@
4243
*/
4344
public class ServerSentEventHttpMessageReaderTests extends AbstractLeakCheckingTests {
4445

45-
private ServerSentEventHttpMessageReader messageReader =
46-
new ServerSentEventHttpMessageReader(new Jackson2JsonDecoder());
46+
private Jackson2JsonDecoder jsonDecoder = new Jackson2JsonDecoder();
47+
48+
private ServerSentEventHttpMessageReader reader = new ServerSentEventHttpMessageReader(this.jsonDecoder);
4749

4850

4951
@Test
5052
public void cantRead() {
51-
assertThat(messageReader.canRead(ResolvableType.forClass(Object.class), new MediaType("foo", "bar"))).isFalse();
52-
assertThat(messageReader.canRead(ResolvableType.forClass(Object.class), null)).isFalse();
53+
assertThat(reader.canRead(ResolvableType.forClass(Object.class), new MediaType("foo", "bar"))).isFalse();
54+
assertThat(reader.canRead(ResolvableType.forClass(Object.class), null)).isFalse();
5355
}
5456

5557
@Test
5658
public void canRead() {
57-
assertThat(messageReader.canRead(ResolvableType.forClass(Object.class), new MediaType("text", "event-stream"))).isTrue();
58-
assertThat(messageReader.canRead(ResolvableType.forClass(ServerSentEvent.class), new MediaType("foo", "bar"))).isTrue();
59+
assertThat(reader.canRead(ResolvableType.forClass(Object.class), new MediaType("text", "event-stream"))).isTrue();
60+
assertThat(reader.canRead(ResolvableType.forClass(ServerSentEvent.class), new MediaType("foo", "bar"))).isTrue();
5961
}
6062

6163
@Test
@@ -66,7 +68,7 @@ public void readServerSentEvents() {
6668
"id:c42\nevent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:bar\n\n" +
6769
"id:c43\nevent:bar\nretry:456\ndata:baz\n\n")));
6870

69-
Flux<ServerSentEvent> events = this.messageReader
71+
Flux<ServerSentEvent> events = this.reader
7072
.read(ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class),
7173
request, Collections.emptyMap()).cast(ServerSentEvent.class);
7274

@@ -98,7 +100,7 @@ public void readServerSentEventsWithMultipleChunks() {
98100
stringBuffer("ent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:"),
99101
stringBuffer("bar\n\nid:c43\nevent:bar\nretry:456\ndata:baz\n\n")));
100102

101-
Flux<ServerSentEvent> events = messageReader
103+
Flux<ServerSentEvent> events = reader
102104
.read(ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class),
103105
request, Collections.emptyMap()).cast(ServerSentEvent.class);
104106

@@ -126,7 +128,7 @@ public void readString() {
126128
MockServerHttpRequest request = MockServerHttpRequest.post("/")
127129
.body(Mono.just(stringBuffer("data:foo\ndata:bar\n\ndata:baz\n\n")));
128130

129-
Flux<String> data = messageReader.read(ResolvableType.forClass(String.class),
131+
Flux<String> data = reader.read(ResolvableType.forClass(String.class),
130132
request, Collections.emptyMap()).cast(String.class);
131133

132134
StepVerifier.create(data)
@@ -143,7 +145,7 @@ public void readPojo() {
143145
"data:{\"foo\": \"foofoo\", \"bar\": \"barbar\"}\n\n" +
144146
"data:{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}\n\n")));
145147

146-
Flux<Pojo> data = messageReader.read(ResolvableType.forClass(Pojo.class), request,
148+
Flux<Pojo> data = reader.read(ResolvableType.forClass(Pojo.class), request,
147149
Collections.emptyMap()).cast(Pojo.class);
148150

149151
StepVerifier.create(data)
@@ -165,7 +167,7 @@ public void decodeFullContentAsString() {
165167
MockServerHttpRequest request = MockServerHttpRequest.post("/")
166168
.body(Mono.just(stringBuffer(body)));
167169

168-
String actual = messageReader
170+
String actual = reader
169171
.readMono(ResolvableType.forClass(String.class), request, Collections.emptyMap())
170172
.cast(String.class)
171173
.block(Duration.ZERO);
@@ -182,7 +184,7 @@ public void readError() {
182184
MockServerHttpRequest request = MockServerHttpRequest.post("/")
183185
.body(body);
184186

185-
Flux<String> data = messageReader.read(ResolvableType.forClass(String.class),
187+
Flux<String> data = reader.read(ResolvableType.forClass(String.class),
186188
request, Collections.emptyMap()).cast(String.class);
187189

188190
StepVerifier.create(data)
@@ -192,6 +194,54 @@ public void readError() {
192194
.verify();
193195
}
194196

197+
@Test
198+
public void maxInMemoryLimit() {
199+
200+
this.reader.setMaxInMemorySize(17);
201+
202+
MockServerHttpRequest request = MockServerHttpRequest.post("/")
203+
.body(Flux.just(stringBuffer("data:\"TOO MUCH DATA\"\ndata:bar\n\ndata:baz\n\n")));
204+
205+
Flux<String> data = this.reader.read(ResolvableType.forClass(String.class),
206+
request, Collections.emptyMap()).cast(String.class);
207+
208+
StepVerifier.create(data)
209+
.expectError(DataBufferLimitException.class)
210+
.verify();
211+
}
212+
213+
@Test // gh-24312
214+
public void maxInMemoryLimitAllowsReadingPojoLargerThanDefaultSize() {
215+
216+
int limit = this.jsonDecoder.getMaxInMemorySize();
217+
218+
String fooValue = getStringOfSize(limit) + "and then some more";
219+
String content = "data:{\"foo\": \"" + fooValue + "\"}\n\n";
220+
MockServerHttpRequest request = MockServerHttpRequest.post("/").body(Mono.just(stringBuffer(content)));
221+
222+
Jackson2JsonDecoder jacksonDecoder = new Jackson2JsonDecoder();
223+
ServerSentEventHttpMessageReader messageReader = new ServerSentEventHttpMessageReader(jacksonDecoder);
224+
225+
jacksonDecoder.setMaxInMemorySize(limit + 1024);
226+
messageReader.setMaxInMemorySize(limit + 1024);
227+
228+
Flux<Pojo> data = messageReader.read(ResolvableType.forClass(Pojo.class), request,
229+
Collections.emptyMap()).cast(Pojo.class);
230+
231+
StepVerifier.create(data)
232+
.consumeNextWith(pojo -> assertThat(pojo.getFoo()).isEqualTo(fooValue))
233+
.expectComplete()
234+
.verify();
235+
}
236+
237+
private static String getStringOfSize(long size) {
238+
StringBuilder content = new StringBuilder("Aa");
239+
while (content.length() < size) {
240+
content.append(content);
241+
}
242+
return content.toString();
243+
}
244+
195245
private DataBuffer stringBuffer(String value) {
196246
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
197247
DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length);

spring-web/src/test/java/org/springframework/http/codec/support/ClientCodecConfigurerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public void maxInMemorySize() {
140140
assertThat(((Jaxb2XmlDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
141141

142142
ServerSentEventHttpMessageReader reader = (ServerSentEventHttpMessageReader) nextReader(readers);
143+
assertThat(reader.getMaxInMemorySize()).isEqualTo(size);
143144
assertThat(((Jackson2JsonDecoder) reader.getDecoder()).getMaxInMemorySize()).isEqualTo(size);
144145

145146
assertThat(((StringDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);

0 commit comments

Comments
 (0)