Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race-Condition in Multipart upload proxy scenario [SPR-16639] #21180

Closed
spring-projects-issues opened this issue Mar 23, 2018 · 11 comments
Closed
Assignees
Labels
in: core Issues in core modules (aop, beans, core, context, expression) in: web Issues in web modules (web, webmvc, webflux, websocket) status: invalid An issue that we don't feel is valid

Comments

@spring-projects-issues
Copy link
Collaborator

spring-projects-issues commented Mar 23, 2018

Marc-Christian Schulze opened SPR-16639 and commented

I'm facing exceptions in my WebFlux Spring Boot application that are all related to the handling of multipart web requests. Once these requests hit the threshold and need to be stored temporarly in the filesystem sporadic race conditions occur.

My Dependencies:

  • Spring Boot 2.0.0.RC1
  • Spring 5.0.4.RELEASE
  • reactor-netty 0.7.5.RELEASE
  • reactor-core 3.1.5.RELEASE
  • reactor-spring 1.0.1.RELEASE

Typically it fails with the reason that it was unable to create the directory /tmp/nio-stream-storage but if I look into the /tmp directory I can see that it was not present before and has been created. If I run the application again it fails sporadically while looking up the temporary files inside of the folder /tmp/nio-stream-storage:

java.lang.IllegalStateException: Unable to create the inputStream.
	at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.newFileInputStream(FileStreamStorage.java:324)
	at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.getInputStream(FileStreamStorage.java:245)
	at org.springframework.core.io.buffer.DataBufferUtils.lambda$readInputStream$1(DataBufferUtils.java:97)
	at reactor.core.publisher.FluxUsing.subscribe(FluxUsing.java:75)
	at reactor.core.publisher.FluxPeekFuseable.subscribe(FluxPeekFuseable.java:86)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
	at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:141)
	at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61)
	at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
	at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:59)
	at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831)
	at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1049)
	at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300)
	at reactor.ipc.netty.NettyOutbound.lambda$sendObject$6(NettyOutbound.java:298)
	at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:134)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
	at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:141)
	at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60)
	at reactor.core.publisher.MonoSourceFlux.subscribe(MonoSourceFlux.java:47)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
	at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:172)
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53)
	at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:380)
	at reactor.ipc.netty.http.client.HttpClientOperations.onHandlerStart(HttpClientOperations.java:501)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:304)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: /tmp/nio-stream-storage/nio-body-1-85f003fe-a58a-4b79-8563-e4b3e6054a40.tmp (No such file or directory)
	at java.io.FileInputStream.open0(Native Method)
	at java.io.FileInputStream.open(FileInputStream.java:195)
	at java.io.FileInputStream.<init>(FileInputStream.java:138)
	at org.synchronoss.cloud.nio.stream.storage.NameAwarePurgableFileInputStream.<init>(NameAwarePurgableFileInputStream.java:49)
	at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.newFileInputStream(FileStreamStorage.java:322)
	... 56 common frames omitted

This smells like a race condition in the handling of this temporary directory and files contained in.

BTW,
Is it actually intended that WebFlux writes uploaded multipart files to disk? I thought the actual intention of being reactive is to leverage backpressure propagation.

Unfortunately, I can't provide a reproducible example.
However, reducing my code to a skeleton it would look like (basically a multi-file upload proxy):

@ResponseBody
@RequestMapping(path = "/somePath", method = RequestMethod.POST, consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public Flux<SomeElement> entrypoint( @RequestBody Flux<Part> parts ) {
  return parts.filter(part -> FilePart.class.isInstance(part))
    .cast(FilePart.class)
    .flatMap(part -> {
      MultipartBodyBuilder builder = new MultipartBodyBuilder();
      builder.asyncPart("file", part.content(), DataBuffer.class).headers(h -> {
        h.setContentDispositionFormData("file", part.filename());
        h.setContentType(MediaType.APPLICATION_OCTET_STREAM);
      });

      return client 
        .post() 
        .uri("http://somewhere/path") 
        .contentType(MediaType.MULTIPART_FORM_DATA) 
        .syncBody(builder.build()) 
        .retrieve() 
        .bodyToMono(SomeElement.class);
    });
}

Affects: 5.0.4

Issue Links:

Referenced from: commits a989ea0

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

Since there is no repro project, can you try with Spring Framework 5.0.5.BUILD-SNAPSHOT and Reactor Netty 0.7.6.BUILD-SNAPSHOT?

@spring-projects-issues
Copy link
Collaborator Author

Marc-Christian Schulze commented

I've upgraded both dependencies as you advised but the error is still there:

java.lang.IllegalStateException: Unable to create the inputStream.
  at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.newFileInputStream(FileStreamStorage.java:324)
  at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.getInputStream(FileStreamStorage.java:245)
  at org.springframework.core.io.buffer.DataBufferUtils.lambda$readInputStream$1(DataBufferUtils.java:97)
  at reactor.core.publisher.FluxUsing.subscribe(FluxUsing.java:75)
  at reactor.core.publisher.FluxPeekFuseable.subscribe(FluxPeekFuseable.java:86)
  at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
  at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
  at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
  at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
  at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
  at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
  at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
  at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
  at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
  at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
  at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
  at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:141)
  at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60)
  at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
  at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418)
  at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210)
  at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128)
  at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61)
  at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121)
  at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
  at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:200)
  at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
  at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:59)
  at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49)
  at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
  at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461)
  at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191)
  at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
  at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802)
  at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
  at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
  at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831)
  at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1032)
  at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:296)
  at reactor.ipc.netty.NettyOutbound.lambda$sendObject$6(NettyOutbound.java:298)
  at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:134)
  at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148)
  at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
  at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
  at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:141)
  at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60)
  at reactor.core.publisher.MonoSourceFlux.subscribe(MonoSourceFlux.java:47)
  at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
  at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:172)
  at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53)
  at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:380)
  at reactor.ipc.netty.http.client.HttpClientOperations.onHandlerStart(HttpClientOperations.java:501)
  at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
  at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
  at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
  at java.lang.Thread.run(Thread.java:748)
  aused by: java.io.FileNotFoundException: /tmp/nio-stream-storage/nio-body-1-1a16487f-41a0-409b-8427-1fe967ce1ff8.tmp (No such file or directory)
  at java.io.FileInputStream.open0(Native Method)
  at java.io.FileInputStream.open(FileInputStream.java:195)
  at java.io.FileInputStream.<init>(FileInputStream.java:138)
  at org.synchronoss.cloud.nio.stream.storage.NameAwarePurgableFileInputStream.<init>(NameAwarePurgableFileInputStream.java:49)
  at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.newFileInputStream(FileStreamStorage.java:322)
  ... 56 common frames omitted

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

For the Spring Boot dependency did you really mean 2.0.0.RELEASE (2.0.0.RC1 doesn't match the rest) ?

The Synchronoss multipart library doesn't quite go as far as supporting Reactive Streams, so currently the request body is consumed without back pressure. By default it seems, part content is kept in memory only if the Content-Length < 10K, and otherwise if greater (or unknown) it is written to disk. Reading from the files is then backpressured (via DataBufferUtils) but the writing to the files is not. We could probably streamline this by plugging in a custom StreamStorage that exposes a Flux, and hooking in back pressure end-to-end, but that would be a separate ticket for sure (and after 5.0.x).

The error suggests when it's time to read, it can't find the temp file where the part content was written. Looking at the file field in FileStreamStorage, it's only set once in the constructor, (probably could be final). Also each part gets its own StreamStorage instance, so it seems unlikely that a race condition can mix up the file name.

The only remotely plausible scenarios are -- temp file name clash (one getting cleaned up first) but that also seems unlikely since it's using UUID to create a unique file name; or multipart content consumed twice, possibly indirectly, e.g. via HiddenHttpMethodFilter (you can put a breakpoint in FileStreamStorage.getInputStream() for example to check).

Can you provide some more details on how you actually run the scenario? Is it one client running in a loop and uploading, or multiple clients running concurrently and if so how many, what is the file size, and when does it start failing?

@spring-projects-issues
Copy link
Collaborator Author

Marc-Christian Schulze commented

Let me check my dependencies again and also investigate which HttpMethodFilters are registered. I'll post additional information soon.

Meanwhile I changed from Jetty to Undertow checking whether this makes any difference and indeed there is one.
Undertow consistently fails, not only sporadically, with a StackOverflowException. Well, this might be another issue but maybe it is not.

Here is the stack trace:

java.lang.StackOverflowError: null
	at org.apache.logging.log4j.core.Logger.isEnabled(Logger.java:239) ~[log4j-core-2.10.0.jar:2.10.0]
	at org.apache.logging.log4j.spi.AbstractLogger.isEnabled(AbstractLogger.java:1497) ~[log4j-api-2.10.0.jar:2.10.0]
	at org.jboss.logging.Log4j2Logger.isEnabled(Log4j2Logger.java:46) ~[jboss-logging-3.3.1.Final.jar:3.3.1.Final]
	at org.jboss.logging.Logger.logf(Logger.java:2397) ~[jboss-logging-3.3.1.Final.jar:3.3.1.Final]
	at org.xnio._private.Messages_$logger.resourceCloseFailed(Messages_$logger.java:963) ~[xnio-api-3.3.8.Final.jar:3.3.8.Final]
	at org.xnio.IoUtils.safeClose(IoUtils.java:138) ~[xnio-api-3.3.8.Final.jar:3.3.8.Final]
	at io.undertow.conduits.ChunkedStreamSourceConduit.read(ChunkedStreamSourceConduit.java:285) ~[undertow-core-1.4.22.Final.jar:1.4.22.Final]
	at org.xnio.conduits.ConduitStreamSourceChannel.read(ConduitStreamSourceChannel.java:127) ~[xnio-api-3.3.8.Final.jar:3.3.8.Final]
	at io.undertow.channels.DetachableStreamSourceChannel.read(DetachableStreamSourceChannel.java:209) ~[undertow-core-1.4.22.Final.jar:1.4.22.Final]
	at io.undertow.server.HttpServerExchange$ReadDispatchChannel.read(HttpServerExchange.java:2332) ~[undertow-core-1.4.22.Final.jar:1.4.22.Final]
	at org.springframework.http.server.reactive.UndertowServerHttpRequest$RequestBodyPublisher.read(UndertowServerHttpRequest.java:172) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.UndertowServerHttpRequest$RequestBodyPublisher.read(UndertowServerHttpRequest.java:128) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher.readAndPublish(AbstractListenerReadPublisher.java:145) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher.access$1000(AbstractListenerReadPublisher.java:47) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State$4.onDataAvailable(AbstractListenerReadPublisher.java:317) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher.onDataAvailable(AbstractListenerReadPublisher.java:85) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.UndertowServerHttpRequest$RequestBodyPublisher.checkOnDataAvailable(UndertowServerHttpRequest.java:156) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher.changeToDemandState(AbstractListenerReadPublisher.java:177) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher.access$900(AbstractListenerReadPublisher.java:47) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State$4.onDataAvailable(AbstractListenerReadPublisher.java:319) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher.onDataAvailable(AbstractListenerReadPublisher.java:85) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.UndertowServerHttpRequest$RequestBodyPublisher.checkOnDataAvailable(UndertowServerHttpRequest.java:156) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher.changeToDemandState(AbstractListenerReadPublisher.java:177) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher.access$900(AbstractListenerReadPublisher.java:47) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State$4.onDataAvailable(AbstractListenerReadPublisher.java:319) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher.onDataAvailable(AbstractListenerReadPublisher.java:85) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.http.server.reactive.UndertowServerHttpRequest$RequestBodyPublisher.checkOnDataAvailable(UndertowServerHttpRequest.java:156) ~[spring-web-5.0.4.RELEASE.jar:5.0.4.RELEASE]
...

Seems there is an infinite recursive loop when new data is available.

Referring to the theory that there are multiple reads on-going: I have to admit that indeed I'm doing some content detection:

// part just arrived as Flux<Part>
Flux<DataBuffer> content = part.content().doOnNext(formatDetector::nextBuffer);
// content is handed over to the proxy to forward to another http endpoint

However, I was assuming that slicing the DataBuffer does not impact the Flux (below snipper from formatDetector):

public void nextBuffer(DataBuffer in) {
ByteBuffer buffer = in.slice(0, in.readableByteCount()).asByteBuffer();
// ... now doing reads for content detection on buffer
}

@spring-projects-issues
Copy link
Collaborator Author

spring-projects-issues commented Mar 24, 2018

Rossen Stoyanchev commented

The Undertow issue means you're not really on 5.0.5 snapshots. The issue was reported and fixed recently #21088.

Slicing at the level of part.content() should be okay.

@spring-projects-issues
Copy link
Collaborator Author

Marc-Christian Schulze commented

I did additional tests but no luck so far to extract a reproducible showcase.
However, while investigating I got additional information.

You are right, the issue seems to be related to multiple subscriptions happening on the same multipart.
I now observe similar behavior in other parts on my application that are not related to multipart handling.
In the other cases I return a Mono<?> from a web controller method but the client complains that the server emits more than a single element.
Looking on the network level I can see 1 HTTP Request being sent to the server and 1 HTTP Response being sent back.
However, the payload of the HTTP response contains the very same payload twice.
This is how the code looks like:

@ResponseBody
@GetMapping(path = "/somePath/{var}")
public Mono<MyDTO> someMethod(@PathVariable("var") String var) {
  MyDTO dto = new MyDTO();

  return springReactiveDataRepo.findByVar(var) // returns Flux<Entry>
    .doOnNext(entry -> {
      dto.addEntry(entry);
    })
    .then(Mono.just(dto));
}

In the payload I can see 2 times the data of MyDTO although this should be impossible since a Mono and not a Flux is returned.

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

Very strange. Maybe debug in AbstractJackson2Encoder and/or ResponseBodyResultHandler to get more insight?

@spring-projects-issues
Copy link
Collaborator Author

Marc-Christian Schulze commented

OK, not exactly what I was looking for but I found one defect in DefaultPartBodyStreamStorageFactory (Line 57).

if (!tempFolder.exists()) {
  // there is a chance that the current thread will transfer execution to a second one here
  if (!tempFolder.mkdirs()) {
    throw new IllegalStateException("Unable to create the temporary folder: " + tempFolderPath);
  }
}

This class is instantiated for each multipart that causes race conditions when multiple requests arrive in parallel.
Error message will be:

21:27:47.644 [5ac3f1d2477ef9d597bbc658ddc13d76/38a3c53159f98dde] [XNIO-1 I/O-2] ERROR reactor.Flux.Lift.3 - onError(java.lang.IllegalStateException: Unable to create the temporary folder: /tmp/nio-stream-storage)
21:27:47.644 [5ac3f1d2477ef9d597bbc658ddc13d76/38a3c53159f98dde] [XNIO-1 I/O-2] ERROR reactor.Flux.Lift.3 - 
java.lang.IllegalStateException: Unable to create the temporary folder: /tmp/nio-stream-storage
  at org.synchronoss.cloud.nio.multipart.DefaultPartBodyStreamStorageFactory.<init>(DefaultPartBodyStreamStorageFactory.java:58)
  at org.synchronoss.cloud.nio.multipart.Multipart$Builder.partStreamsFactory(Multipart.java:137)
  at org.synchronoss.cloud.nio.multipart.Multipart$Builder.forNIO(Multipart.java:150)
  at org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader$SynchronossPartGenerator.accept(SynchronossPartHttpMessageReader.java:133)
  at org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader$SynchronossPartGenerator.accept(SynchronossPartHttpMessageReader.java:109)
  at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:92)
  at reactor.core.publisher.FluxOnErrorResume.subscribe(FluxOnErrorResume.java:47)
  at reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)
  at reactor.core.publisher.FluxSwitchIfEmpty.subscribe(FluxSwitchIfEmpty.java:45)
  at reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)
  at reactor.core.publisher.FluxLog.subscribe(FluxLog.java:50)
  at reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)
  at reactor.core.publisher.FluxFilter.subscribe(FluxFilter.java:52)
  at reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)
  at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62)
  ...

Could be easily fixed:

if (!tempFolder.exists()) {
  if (!tempFolder.mkdirs()) {
    if (!tempFolder.exists()) {
      throw new IllegalStateException("Unable to create the temporary folder: " + tempFolderPath);
    }
  }
}

However, I'm still debugging for the second defect causing the race condition on file level.

@spring-projects-issues
Copy link
Collaborator Author

spring-projects-issues commented Apr 26, 2018

Sébastien Deleuze commented

Marc-Christian Schulze Since #21268 has been fixed, could you please test again with 5.0.6.BUILD-SNAPSHOT builds?

@spring-projects-issues
Copy link
Collaborator Author

Viktor commented

I have the similar issue.

Code that can reproduce this issue if file size > 10KB(file with <=10KB don't produce this issue):

import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.Resource;
import org.springframework.core.io.WritableResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.UUID;

@RestController
public class FileController {

    @Value("gs://some-bucket")
    private Resource gcsResource;

    @PostMapping(consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<String> upload(@RequestPart Mono<FilePart> file) {

        return file.flatMap(this::validate)
                   .flatMap(this::getByteArray)
                   .flatMap(bytes -> {
                       var blobId = UUID.randomUUID().toString();
                       try {
                           gcsResource = gcsResource.createRelative(blobId);
                           try (OutputStream os = ((WritableResource) gcsResource).getOutputStream()) {
                               os.write(bytes);
                           }
                       } catch (IOException e) {
                       }

                       return Mono.just(blobId);
                   });
    }

    private Mono<FilePart> validate(FilePart filePart) {
        return getByteArray(filePart)
                   .filter(bytes -> bytes.length <= 5L * FileUtils.ONE_MB)
                   .switchIfEmpty(Mono.error(new Exception()))
                   .thenReturn(filePart);
    }

    private Mono<byte[]> getByteArray(FilePart filePart) {
        return DataBufferUtils.join(filePart.content())
                              .map(DataBuffer::asByteBuffer)
                              .map(ByteBuffer::array);
    }
}

If you will remove validation part, problem will gone away.

 

build.gradle file:

buildscript {
    ext {
        springBootVersion = '2.0.3.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.somegroup'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 10

repositories {
    mavenCentral()
    maven { url "https://repo.spring.io/milestone" }
}

ext {
    springCloudVersion = 'Finchley.RELEASE'
    springCloudGcpVersion = '1.0.0.M3'
    apacheCommonsIoVersion = '2.6'
}

dependencies {
    compile("commons-io:commons-io:${apacheCommonsIoVersion}")
    compile('org.springframework.boot:spring-boot-starter-webflux')
    compile('org.springframework.cloud:spring-cloud-gcp-starter-storage')
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
        mavenBom "org.springframework.cloud:spring-cloud-gcp-dependencies:${springCloudGcpVersion}"
    }
}

 

There is a stack trace:

java.lang.IllegalStateException: Unable to create the inputStream. at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.newFileInputStream(FileStreamStorage.java:324) ~[nio-stream-storage-1.1.3.jar:na] at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.getInputStream(FileStreamStorage.java:245) ~[nio-stream-storage-1.1.3.jar:na] at org.springframework.core.io.buffer.DataBufferUtils.lambda$readInputStream$1(DataBufferUtils.java:97) ~[spring-core-5.0.7.RELEASE.jar:5.0.7.RELEASE] at reactor.core.publisher.FluxUsing.subscribe(FluxUsing.java:76) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFilterFuseable.subscribe(MonoFilterFuseable.java:47) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxTake$TakeSubscriber.onNext(FluxTake.java:122) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxIterable$IterableSubscription.fastPath(FluxIterable.java:312) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:199) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:140) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:233) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:140) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:64) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.Flux.subscribe(Flux.java:6877) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:184) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1083) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoProcessor.subscribe(MonoProcessor.java:457) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMapMany.subscribe(MonoFlatMapMany.java:49) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxTake.subscribe(FluxTake.java:56) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:190) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:239) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:80) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onComplete(FluxFilterFuseable.java:151) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1085) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:104) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1083) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:117) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.onComplete(FluxUsing.java:385) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxGenerate$GenerateSubscription.complete(FluxGenerate.java:193) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:537) [spring-core-5.0.7.RELEASE.jar:5.0.7.RELEASE] at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:508) [spring-core-5.0.7.RELEASE.jar:5.0.7.RELEASE] at reactor.core.publisher.FluxGenerate.lambda$new$1(FluxGenerate.java:56) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxGenerate$GenerateSubscription.fastPath(FluxGenerate.java:223) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:202) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.request(FluxUsing.java:318) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onSubscribe(MonoCollectList.java:90) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.onSubscribe(FluxUsing.java:345) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:83) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxUsing.subscribe(FluxUsing.java:103) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFilterFuseable.subscribe(MonoFilterFuseable.java:47) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxTake$TakeSubscriber.onNext(FluxTake.java:122) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxIterable$IterableSubscription.fastPath(FluxIterable.java:312) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:199) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:140) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:233) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:140) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:64) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.Flux.subscribe(Flux.java:6877) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:184) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1083) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1083) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:142) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:409) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:717) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:671) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxCreate$SerializedSink.drainLoop(FluxCreate.java:226) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxCreate$SerializedSink.drain(FluxCreate.java:197) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxCreate$SerializedSink.complete(FluxCreate.java:192) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader$FluxSinkAdapterListener.onAllPartsFinished(SynchronossPartHttpMessageReader.java:231) ~[spring-web-5.0.7.RELEASE.jar:5.0.7.RELEASE] at org.synchronoss.cloud.nio.multipart.NioMultipartParser.allPartsRead(NioMultipartParser.java:603) ~[nio-multipart-parser-1.1.0.jar:na] at org.synchronoss.cloud.nio.multipart.NioMultipartParser.write(NioMultipartParser.java:449) ~[nio-multipart-parser-1.1.0.jar:na] at org.synchronoss.cloud.nio.multipart.NioMultipartParser.write(NioMultipartParser.java:370) ~[nio-multipart-parser-1.1.0.jar:na] at org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader$SynchronossPartGenerator.lambda$accept$0(SynchronossPartHttpMessageReader.java:150) ~[spring-web-5.0.7.RELEASE.jar:5.0.7.RELEASE] at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE] at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:326) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE] at reactor.ipc.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:309) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE] at reactor.ipc.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:405) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE] at reactor.ipc.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:136) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at reactor.ipc.netty.http.server.HttpServerHandler.channelRead(HttpServerHandler.java:164) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) ~[netty-codec-4.1.25.Final.jar:4.1.25.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) ~[netty-codec-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:647) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:547) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:501) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) ~[netty-common-4.1.25.Final.jar:4.1.25.Final] at java.base/java.lang.Thread.run(Thread.java:844) ~[na:na]Caused by: java.io.FileNotFoundException: /var/folders/w3/yb42hn3j3dq55rgf8nylz03c0000gn/T/nio-file-upload/nio-body-4-f8674d32-a520-4531-9739-9c1071b6ccbd.tmp (No such file or directory) at java.base/java.io.FileInputStream.open0(Native Method) ~[na:na] at java.base/java.io.FileInputStream.open(FileInputStream.java:220) ~[na:na] at java.base/java.io.FileInputStream.<init>(FileInputStream.java:158) ~[na:na] at org.synchronoss.cloud.nio.stream.storage.NameAwarePurgableFileInputStream.<init>(NameAwarePurgableFileInputStream.java:49) ~[nio-stream-storage-1.1.3.jar:na] at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.newFileInputStream(FileStreamStorage.java:322) ~[nio-stream-storage-1.1.3.jar:na] ... 178 common frames omitted

 

Maybe, I do something wrong, but how that can be fixed?

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

Viktor, yes a couple of things.

One, with validate the code consumes the the content twice, once inside the validate method with getByteArray and then a second time in the flatMap also with getByteArray. However you can consume it once. If you want to validate the whole content, once it's buffered into a byte[], keep it that way, adn pass it down the chain. No need to consume it again.

Effectively you want to end up with a chain that looks like this:

getByteArray(filePart)
        .doOnNext(bytes -> {
                if (bytes.length <= 5L * FileUtils.ONE_MB)) {
                        throw new Exception("...");
                }
        })
        .map(bytes -> {
                // write here...
                return blobId;
        });

An additional point is that the write to the OutputStream may block, depending on the Resource type, and that's not allowed in the concurrency model for WebFlux. You can use DataBufferUtils#write to write a publisher of data buffers to an OutputStream.

@spring-projects-issues spring-projects-issues added type: bug A general bug in: core Issues in core modules (aop, beans, core, context, expression) in: web Issues in web modules (web, webmvc, webflux, websocket) labels Jan 11, 2019
@spring-projects-issues spring-projects-issues removed the type: bug A general bug label Jan 11, 2019
@rstoyanchev rstoyanchev added the status: invalid An issue that we don't feel is valid label Dec 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: core Issues in core modules (aop, beans, core, context, expression) in: web Issues in web modules (web, webmvc, webflux, websocket) status: invalid An issue that we don't feel is valid
Projects
None yet
Development

No branches or pull requests

2 participants