Description
Marc-Christian Schulze opened SPR-16639 and commented
I'm facing exceptions in my WebFlux Spring Boot application that are all related to the handling of multipart web requests. Once these requests hit the threshold and need to be stored temporarly in the filesystem sporadic race conditions occur.
My Dependencies:
- Spring Boot 2.0.0.RC1
- Spring 5.0.4.RELEASE
- reactor-netty 0.7.5.RELEASE
- reactor-core 3.1.5.RELEASE
- reactor-spring 1.0.1.RELEASE
Typically it fails with the reason that it was unable to create the directory /tmp/nio-stream-storage
but if I look into the /tmp
directory I can see that it was not present before and has been created. If I run the application again it fails sporadically while looking up the temporary files inside of the folder /tmp/nio-stream-storage
:
java.lang.IllegalStateException: Unable to create the inputStream.
at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.newFileInputStream(FileStreamStorage.java:324)
at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.getInputStream(FileStreamStorage.java:245)
at org.springframework.core.io.buffer.DataBufferUtils.lambda$readInputStream$1(DataBufferUtils.java:97)
at reactor.core.publisher.FluxUsing.subscribe(FluxUsing.java:75)
at reactor.core.publisher.FluxPeekFuseable.subscribe(FluxPeekFuseable.java:86)
at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:141)
at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60)
at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61)
at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121)
at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:59)
at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49)
at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461)
at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831)
at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1049)
at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300)
at reactor.ipc.netty.NettyOutbound.lambda$sendObject$6(NettyOutbound.java:298)
at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:134)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:141)
at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60)
at reactor.core.publisher.MonoSourceFlux.subscribe(MonoSourceFlux.java:47)
at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:172)
at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53)
at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:380)
at reactor.ipc.netty.http.client.HttpClientOperations.onHandlerStart(HttpClientOperations.java:501)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:304)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: /tmp/nio-stream-storage/nio-body-1-85f003fe-a58a-4b79-8563-e4b3e6054a40.tmp (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at org.synchronoss.cloud.nio.stream.storage.NameAwarePurgableFileInputStream.<init>(NameAwarePurgableFileInputStream.java:49)
at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.newFileInputStream(FileStreamStorage.java:322)
... 56 common frames omitted
This smells like a race condition in the handling of this temporary directory and files contained in.
BTW,
Is it actually intended that WebFlux writes uploaded multipart files to disk? I thought the actual intention of being reactive is to leverage backpressure propagation.
Unfortunately, I can't provide a reproducible example.
However, reducing my code to a skeleton it would look like (basically a multi-file upload proxy):
@ResponseBody
@RequestMapping(path = "/somePath", method = RequestMethod.POST, consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public Flux<SomeElement> entrypoint( @RequestBody Flux<Part> parts ) {
return parts.filter(part -> FilePart.class.isInstance(part))
.cast(FilePart.class)
.flatMap(part -> {
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.asyncPart("file", part.content(), DataBuffer.class).headers(h -> {
h.setContentDispositionFormData("file", part.filename());
h.setContentType(MediaType.APPLICATION_OCTET_STREAM);
});
return client
.post()
.uri("http://somewhere/path")
.contentType(MediaType.MULTIPART_FORM_DATA)
.syncBody(builder.build())
.retrieve()
.bodyToMono(SomeElement.class);
});
}
Affects: 5.0.4
Issue Links:
- Make WebFlux multipart support fully Reactive [SPR-17122] #21659 Make WebFlux multipart support fully Reactive
- SynchronossPartGenerator should reuse PartBodyStreamStorageFactory [SPR-16727] #21268 SynchronossPartGenerator should reuse PartBodyStreamStorageFactory
Referenced from: commits a989ea0