-
Notifications
You must be signed in to change notification settings - Fork 38.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Race-Condition in Multipart upload proxy scenario [SPR-16639] #21180
Comments
Rossen Stoyanchev commented Since there is no repro project, can you try with Spring Framework 5.0.5.BUILD-SNAPSHOT and Reactor Netty 0.7.6.BUILD-SNAPSHOT? |
Marc-Christian Schulze commented I've upgraded both dependencies as you advised but the error is still there:
|
Rossen Stoyanchev commented For the Spring Boot dependency did you really mean 2.0.0.RELEASE (2.0.0.RC1 doesn't match the rest) ? The Synchronoss multipart library doesn't quite go as far as supporting Reactive Streams, so currently the request body is consumed without back pressure. By default it seems, part content is kept in memory only if the Content-Length < 10K, and otherwise if greater (or unknown) it is written to disk. Reading from the files is then backpressured (via DataBufferUtils) but the writing to the files is not. We could probably streamline this by plugging in a custom The error suggests when it's time to read, it can't find the temp file where the part content was written. Looking at the The only remotely plausible scenarios are -- temp file name clash (one getting cleaned up first) but that also seems unlikely since it's using UUID to create a unique file name; or multipart content consumed twice, possibly indirectly, e.g. via Can you provide some more details on how you actually run the scenario? Is it one client running in a loop and uploading, or multiple clients running concurrently and if so how many, what is the file size, and when does it start failing? |
Marc-Christian Schulze commented Let me check my dependencies again and also investigate which Meanwhile I changed from Jetty to Undertow checking whether this makes any difference and indeed there is one. Here is the stack trace:
Seems there is an infinite recursive loop when new data is available. Referring to the theory that there are multiple reads on-going: I have to admit that indeed I'm doing some content detection:
However, I was assuming that slicing the DataBuffer does not impact the Flux (below snipper from formatDetector):
|
Rossen Stoyanchev commented The Undertow issue means you're not really on 5.0.5 snapshots. The issue was reported and fixed recently #21088. Slicing at the level of |
Marc-Christian Schulze commented I did additional tests but no luck so far to extract a reproducible showcase. You are right, the issue seems to be related to multiple subscriptions happening on the same multipart. @ResponseBody
@GetMapping(path = "/somePath/{var}")
public Mono<MyDTO> someMethod(@PathVariable("var") String var) {
MyDTO dto = new MyDTO();
return springReactiveDataRepo.findByVar(var) // returns Flux<Entry>
.doOnNext(entry -> {
dto.addEntry(entry);
})
.then(Mono.just(dto));
} In the payload I can see 2 times the data of MyDTO although this should be impossible since a Mono and not a Flux is returned. |
Rossen Stoyanchev commented Very strange. Maybe debug in AbstractJackson2Encoder and/or ResponseBodyResultHandler to get more insight? |
Marc-Christian Schulze commented OK, not exactly what I was looking for but I found one defect in DefaultPartBodyStreamStorageFactory (Line 57). if (!tempFolder.exists()) {
// there is a chance that the current thread will transfer execution to a second one here
if (!tempFolder.mkdirs()) {
throw new IllegalStateException("Unable to create the temporary folder: " + tempFolderPath);
}
} This class is instantiated for each multipart that causes race conditions when multiple requests arrive in parallel.
Could be easily fixed: if (!tempFolder.exists()) {
if (!tempFolder.mkdirs()) {
if (!tempFolder.exists()) {
throw new IllegalStateException("Unable to create the temporary folder: " + tempFolderPath);
}
}
} However, I'm still debugging for the second defect causing the race condition on file level. |
Sébastien Deleuze commented Marc-Christian Schulze Since #21268 has been fixed, could you please test again with |
Viktor commented I have the similar issue. Code that can reproduce this issue if file size > 10KB(file with <=10KB don't produce this issue): import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.Resource;
import org.springframework.core.io.WritableResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.UUID;
@RestController
public class FileController {
@Value("gs://some-bucket")
private Resource gcsResource;
@PostMapping(consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
@ResponseStatus(HttpStatus.CREATED)
public Mono<String> upload(@RequestPart Mono<FilePart> file) {
return file.flatMap(this::validate)
.flatMap(this::getByteArray)
.flatMap(bytes -> {
var blobId = UUID.randomUUID().toString();
try {
gcsResource = gcsResource.createRelative(blobId);
try (OutputStream os = ((WritableResource) gcsResource).getOutputStream()) {
os.write(bytes);
}
} catch (IOException e) {
}
return Mono.just(blobId);
});
}
private Mono<FilePart> validate(FilePart filePart) {
return getByteArray(filePart)
.filter(bytes -> bytes.length <= 5L * FileUtils.ONE_MB)
.switchIfEmpty(Mono.error(new Exception()))
.thenReturn(filePart);
}
private Mono<byte[]> getByteArray(FilePart filePart) {
return DataBufferUtils.join(filePart.content())
.map(DataBuffer::asByteBuffer)
.map(ByteBuffer::array);
}
} If you will remove validation part, problem will gone away.
build.gradle file: buildscript {
ext {
springBootVersion = '2.0.3.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.somegroup'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 10
repositories {
mavenCentral()
maven { url "https://repo.spring.io/milestone" }
}
ext {
springCloudVersion = 'Finchley.RELEASE'
springCloudGcpVersion = '1.0.0.M3'
apacheCommonsIoVersion = '2.6'
}
dependencies {
compile("commons-io:commons-io:${apacheCommonsIoVersion}")
compile('org.springframework.boot:spring-boot-starter-webflux')
compile('org.springframework.cloud:spring-cloud-gcp-starter-storage')
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
mavenBom "org.springframework.cloud:spring-cloud-gcp-dependencies:${springCloudGcpVersion}"
}
}
There is a stack trace: java.lang.IllegalStateException: Unable to create the inputStream. at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.newFileInputStream(FileStreamStorage.java:324) ~[nio-stream-storage-1.1.3.jar:na] at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.getInputStream(FileStreamStorage.java:245) ~[nio-stream-storage-1.1.3.jar:na] at org.springframework.core.io.buffer.DataBufferUtils.lambda$readInputStream$1(DataBufferUtils.java:97) ~[spring-core-5.0.7.RELEASE.jar:5.0.7.RELEASE] at reactor.core.publisher.FluxUsing.subscribe(FluxUsing.java:76) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFilterFuseable.subscribe(MonoFilterFuseable.java:47) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxTake$TakeSubscriber.onNext(FluxTake.java:122) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxIterable$IterableSubscription.fastPath(FluxIterable.java:312) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:199) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:140) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:233) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:140) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:64) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.Flux.subscribe(Flux.java:6877) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:184) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1083) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoProcessor.subscribe(MonoProcessor.java:457) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMapMany.subscribe(MonoFlatMapMany.java:49) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxTake.subscribe(FluxTake.java:56) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:190) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:239) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:80) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onComplete(FluxFilterFuseable.java:151) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1085) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:104) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1083) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:117) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.onComplete(FluxUsing.java:385) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxGenerate$GenerateSubscription.complete(FluxGenerate.java:193) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:537) [spring-core-5.0.7.RELEASE.jar:5.0.7.RELEASE] at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:508) [spring-core-5.0.7.RELEASE.jar:5.0.7.RELEASE] at reactor.core.publisher.FluxGenerate.lambda$new$1(FluxGenerate.java:56) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxGenerate$GenerateSubscription.fastPath(FluxGenerate.java:223) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:202) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.request(FluxUsing.java:318) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onSubscribe(MonoCollectList.java:90) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.onSubscribe(FluxUsing.java:345) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:83) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxUsing.subscribe(FluxUsing.java:103) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFilterFuseable.subscribe(MonoFilterFuseable.java:47) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxTake$TakeSubscriber.onNext(FluxTake.java:122) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:238) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxIterable$IterableSubscription.fastPath(FluxIterable.java:312) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:199) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:140) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:233) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:140) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:64) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.Flux.subscribe(Flux.java:6877) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:184) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1083) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1083) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:142) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:90) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:409) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:717) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:671) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxCreate$SerializedSink.drainLoop(FluxCreate.java:226) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxCreate$SerializedSink.drain(FluxCreate.java:197) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.core.publisher.FluxCreate$SerializedSink.complete(FluxCreate.java:192) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader$FluxSinkAdapterListener.onAllPartsFinished(SynchronossPartHttpMessageReader.java:231) ~[spring-web-5.0.7.RELEASE.jar:5.0.7.RELEASE] at org.synchronoss.cloud.nio.multipart.NioMultipartParser.allPartsRead(NioMultipartParser.java:603) ~[nio-multipart-parser-1.1.0.jar:na] at org.synchronoss.cloud.nio.multipart.NioMultipartParser.write(NioMultipartParser.java:449) ~[nio-multipart-parser-1.1.0.jar:na] at org.synchronoss.cloud.nio.multipart.NioMultipartParser.write(NioMultipartParser.java:370) ~[nio-multipart-parser-1.1.0.jar:na] at org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader$SynchronossPartGenerator.lambda$accept$0(SynchronossPartHttpMessageReader.java:150) ~[spring-web-5.0.7.RELEASE.jar:5.0.7.RELEASE] at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE] at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE] at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:326) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE] at reactor.ipc.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:309) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE] at reactor.ipc.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:405) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE] at reactor.ipc.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:136) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at reactor.ipc.netty.http.server.HttpServerHandler.channelRead(HttpServerHandler.java:164) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) ~[netty-codec-4.1.25.Final.jar:4.1.25.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) ~[netty-codec-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:647) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:547) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:501) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) ~[netty-common-4.1.25.Final.jar:4.1.25.Final] at java.base/java.lang.Thread.run(Thread.java:844) ~[na:na]Caused by: java.io.FileNotFoundException: /var/folders/w3/yb42hn3j3dq55rgf8nylz03c0000gn/T/nio-file-upload/nio-body-4-f8674d32-a520-4531-9739-9c1071b6ccbd.tmp (No such file or directory) at java.base/java.io.FileInputStream.open0(Native Method) ~[na:na] at java.base/java.io.FileInputStream.open(FileInputStream.java:220) ~[na:na] at java.base/java.io.FileInputStream.<init>(FileInputStream.java:158) ~[na:na] at org.synchronoss.cloud.nio.stream.storage.NameAwarePurgableFileInputStream.<init>(NameAwarePurgableFileInputStream.java:49) ~[nio-stream-storage-1.1.3.jar:na] at org.synchronoss.cloud.nio.stream.storage.FileStreamStorage.newFileInputStream(FileStreamStorage.java:322) ~[nio-stream-storage-1.1.3.jar:na] ... 178 common frames omitted
Maybe, I do something wrong, but how that can be fixed? |
Rossen Stoyanchev commented Viktor, yes a couple of things. One, with validate the code consumes the the content twice, once inside the validate method with getByteArray and then a second time in the flatMap also with getByteArray. However you can consume it once. If you want to validate the whole content, once it's buffered into a byte[], keep it that way, adn pass it down the chain. No need to consume it again. Effectively you want to end up with a chain that looks like this: getByteArray(filePart)
.doOnNext(bytes -> {
if (bytes.length <= 5L * FileUtils.ONE_MB)) {
throw new Exception("...");
}
})
.map(bytes -> {
// write here...
return blobId;
}); An additional point is that the write to the OutputStream may block, depending on the Resource type, and that's not allowed in the concurrency model for WebFlux. You can use DataBufferUtils#write to write a publisher of data buffers to an OutputStream. |
Marc-Christian Schulze opened SPR-16639 and commented
I'm facing exceptions in my WebFlux Spring Boot application that are all related to the handling of multipart web requests. Once these requests hit the threshold and need to be stored temporarly in the filesystem sporadic race conditions occur.
My Dependencies:
Typically it fails with the reason that it was unable to create the directory
/tmp/nio-stream-storage
but if I look into the/tmp
directory I can see that it was not present before and has been created. If I run the application again it fails sporadically while looking up the temporary files inside of the folder/tmp/nio-stream-storage
:This smells like a race condition in the handling of this temporary directory and files contained in.
BTW,
Is it actually intended that WebFlux writes uploaded multipart files to disk? I thought the actual intention of being reactive is to leverage backpressure propagation.
Unfortunately, I can't provide a reproducible example.
However, reducing my code to a skeleton it would look like (basically a multi-file upload proxy):
Affects: 5.0.4
Issue Links:
Referenced from: commits a989ea0
The text was updated successfully, but these errors were encountered: