Skip to content

Race-Condition in Multipart upload proxy scenario [SPR-16639] #21180

Closed
@spring-projects-issues

Description

@spring-projects-issues

Marc-Christian Schulze opened SPR-16639 and commented

I'm facing exceptions in my WebFlux Spring Boot application that are all related to the handling of multipart web requests. Once these requests hit the threshold and need to be stored temporarly in the filesystem sporadic race conditions occur.

My Dependencies:

  • Spring Boot 2.0.0.RC1
  • Spring 5.0.4.RELEASE
  • reactor-netty 0.7.5.RELEASE
  • reactor-core 3.1.5.RELEASE
  • reactor-spring 1.0.1.RELEASE

Typically it fails with the reason that it was unable to create the directory /tmp/nio-stream-storage but if I look into the /tmp directory I can see that it was not present before and has been created. If I run the application again it fails sporadically while looking up the temporary files inside of the folder /tmp/nio-stream-storage:

java.lang.IllegalStateException: Unable to create the inputStream.
	at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.newFileInputStream(FileStreamStorage.java:324)
	at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.getInputStream(FileStreamStorage.java:245)
	at org.springframework.core.io.buffer.DataBufferUtils.lambda$readInputStream$1(DataBufferUtils.java:97)
	at reactor.core.publisher.FluxUsing.subscribe(FluxUsing.java:75)
	at reactor.core.publisher.FluxPeekFuseable.subscribe(FluxPeekFuseable.java:86)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
	at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:141)
	at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61)
	at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
	at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:59)
	at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831)
	at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1049)
	at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300)
	at reactor.ipc.netty.NettyOutbound.lambda$sendObject$6(NettyOutbound.java:298)
	at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:134)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
	at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:141)
	at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60)
	at reactor.core.publisher.MonoSourceFlux.subscribe(MonoSourceFlux.java:47)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
	at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:172)
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53)
	at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:380)
	at reactor.ipc.netty.http.client.HttpClientOperations.onHandlerStart(HttpClientOperations.java:501)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:304)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: /tmp/nio-stream-storage/nio-body-1-85f003fe-a58a-4b79-8563-e4b3e6054a40.tmp (No such file or directory)
	at java.io.FileInputStream.open0(Native Method)
	at java.io.FileInputStream.open(FileInputStream.java:195)
	at java.io.FileInputStream.<init>(FileInputStream.java:138)
	at org.synchronoss.cloud.nio.stream.storage.NameAwarePurgableFileInputStream.<init>(NameAwarePurgableFileInputStream.java:49)
	at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.newFileInputStream(FileStreamStorage.java:322)
	... 56 common frames omitted

This smells like a race condition in the handling of this temporary directory and files contained in.

BTW,
Is it actually intended that WebFlux writes uploaded multipart files to disk? I thought the actual intention of being reactive is to leverage backpressure propagation.

Unfortunately, I can't provide a reproducible example.
However, reducing my code to a skeleton it would look like (basically a multi-file upload proxy):

@ResponseBody
@RequestMapping(path = "/somePath", method = RequestMethod.POST, consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public Flux<SomeElement> entrypoint( @RequestBody Flux<Part> parts ) {
  return parts.filter(part -> FilePart.class.isInstance(part))
    .cast(FilePart.class)
    .flatMap(part -> {
      MultipartBodyBuilder builder = new MultipartBodyBuilder();
      builder.asyncPart("file", part.content(), DataBuffer.class).headers(h -> {
        h.setContentDispositionFormData("file", part.filename());
        h.setContentType(MediaType.APPLICATION_OCTET_STREAM);
      });

      return client 
        .post() 
        .uri("http://somewhere/path") 
        .contentType(MediaType.MULTIPART_FORM_DATA) 
        .syncBody(builder.build()) 
        .retrieve() 
        .bodyToMono(SomeElement.class);
    });
}

Affects: 5.0.4

Issue Links:

Referenced from: commits a989ea0

Metadata

Metadata

Assignees

Labels

in: coreIssues in core modules (aop, beans, core, context, expression)in: webIssues in web modules (web, webmvc, webflux, websocket)status: invalidAn issue that we don't feel is valid

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions