Skip to content

Commit

Permalink
Polish Synchronoss message reader
Browse files Browse the repository at this point in the history
Issue: SPR-16639
  • Loading branch information
rstoyanchev committed Mar 24, 2018
1 parent 729d0d2 commit a989ea0
Showing 1 changed file with 33 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,25 +98,27 @@ public Mono<Part> readMono(ResolvableType elementType, ReactiveHttpInputMessage
Map<String, Object> hints) {

return Mono.error(new UnsupportedOperationException(
"This reader does not support reading a single element."));
"Can't read a multipart request body into a single Part."));
}


/**
* Consume and feed input to the Synchronoss parser, then adapt parser
* output events to {@code Flux<Sink<Part>>}.
* Consume and feed input to the Synchronoss parser, then listen for parser
* output events and adapt to {@code Flux<Sink<Part>>}.
*/
private static class SynchronossPartGenerator implements Consumer<FluxSink<Part>> {

private final ReactiveHttpInputMessage inputMessage;

private final DataBufferFactory bufferFactory;


SynchronossPartGenerator(ReactiveHttpInputMessage inputMessage, DataBufferFactory factory) {
this.inputMessage = inputMessage;
this.bufferFactory = factory;
}


@Override
public void accept(FluxSink<Part> emitter) {
HttpHeaders headers = this.inputMessage.getHeaders();
Expand Down Expand Up @@ -176,12 +178,14 @@ private static class FluxSinkAdapterListener implements NioMultipartParserListen

private final AtomicInteger terminated = new AtomicInteger(0);

FluxSinkAdapterListener(FluxSink<Part> sink, DataBufferFactory bufferFactory, MultipartContext context) {

FluxSinkAdapterListener(FluxSink<Part> sink, DataBufferFactory factory, MultipartContext context) {
this.sink = sink;
this.bufferFactory = bufferFactory;
this.bufferFactory = factory;
this.context = context;
}


@Override
public void onPartFinished(StreamStorage storage, Map<String, List<String>> headers) {
HttpHeaders httpHeaders = new HttpHeaders();
Expand All @@ -192,14 +196,14 @@ public void onPartFinished(StreamStorage storage, Map<String, List<String>> head
private Part createPart(StreamStorage storage, HttpHeaders httpHeaders) {
String filename = MultipartUtils.getFileName(httpHeaders);
if (filename != null) {
return new SynchronossFilePart(httpHeaders, storage, this.bufferFactory, filename);
return new SynchronossFilePart(httpHeaders, filename, storage, this.bufferFactory);
}
else if (MultipartUtils.isFormField(httpHeaders, this.context)) {
String value = MultipartUtils.readFormParameterValue(storage, httpHeaders);
return new SynchronossFormFieldPart(httpHeaders, this.bufferFactory, value);
}
else {
return new DefaultSynchronossPart(httpHeaders, storage, this.bufferFactory);
return new SynchronossPart(httpHeaders, storage, this.bufferFactory);
}
}

Expand Down Expand Up @@ -229,47 +233,53 @@ public void onNestedPartFinished() {

private abstract static class AbstractSynchronossPart implements Part {

private final String name;

private final HttpHeaders headers;

private final DataBufferFactory bufferFactory;


AbstractSynchronossPart(HttpHeaders headers, DataBufferFactory bufferFactory) {
Assert.notNull(headers, "HttpHeaders is required");
Assert.notNull(bufferFactory, "'bufferFactory' is required");
this.name = MultipartUtils.getFieldName(headers);
this.headers = headers;
this.bufferFactory = bufferFactory;
}


@Override
public String name() {
return MultipartUtils.getFieldName(this.headers);
return this.name;
}

@Override
public HttpHeaders headers() {
return this.headers;
}

protected DataBufferFactory getBufferFactory() {
DataBufferFactory getBufferFactory() {
return this.bufferFactory;
}
}


private static class DefaultSynchronossPart extends AbstractSynchronossPart {
private static class SynchronossPart extends AbstractSynchronossPart {

private final StreamStorage storage;

DefaultSynchronossPart(HttpHeaders headers, StreamStorage storage, DataBufferFactory factory) {

SynchronossPart(HttpHeaders headers, StreamStorage storage, DataBufferFactory factory) {
super(headers, factory);
Assert.notNull(storage, "'storage' is required");
this.storage = storage;
}


@Override
public Flux<DataBuffer> content() {
return DataBufferUtils.readInputStream(this.storage::getInputStream, getBufferFactory(),
4096);
return DataBufferUtils.readInputStream(getStorage()::getInputStream, getBufferFactory(), 4096);
}

protected StreamStorage getStorage() {
Expand All @@ -278,20 +288,23 @@ protected StreamStorage getStorage() {
}


private static class SynchronossFilePart extends DefaultSynchronossPart implements FilePart {
private static class SynchronossFilePart extends SynchronossPart implements FilePart {

private static final OpenOption[] FILE_CHANNEL_OPTIONS = {
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE };


private final String filename;

public SynchronossFilePart(
HttpHeaders headers, StreamStorage storage, DataBufferFactory factory, String filename) {

SynchronossFilePart(HttpHeaders headers, String filename, StreamStorage storage,
DataBufferFactory factory) {

super(headers, storage, factory);
this.filename = filename;
}


@Override
public String filename() {
return this.filename;
Expand Down Expand Up @@ -342,11 +355,13 @@ private static class SynchronossFormFieldPart extends AbstractSynchronossPart im

private final String content;


SynchronossFormFieldPart(HttpHeaders headers, DataBufferFactory bufferFactory, String content) {
super(headers, bufferFactory);
this.content = content;
}


@Override
public String value() {
return this.content;
Expand All @@ -361,8 +376,8 @@ public Flux<DataBuffer> content() {
}

private Charset getCharset() {
return Optional.ofNullable(MultipartUtils.getCharEncoding(headers()))
.map(Charset::forName).orElse(StandardCharsets.UTF_8);
String name = MultipartUtils.getCharEncoding(headers());
return (name != null ? Charset.forName(name) : StandardCharsets.UTF_8);
}
}

Expand Down

0 comments on commit a989ea0

Please sign in to comment.