Skip to content

Commit

Permalink
fix streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
gaeljw committed Nov 1, 2020
1 parent 3f79ac7 commit 2263992
Showing 1 changed file with 39 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import java.nio.ByteBuffer
import java.nio.file.Files
import java.util.function.Supplier

import akka.stream.scaladsl.Source
import akka.util.ByteString
import play.api.libs.ws.DefaultBodyReadables._
import play.api.libs.ws.DefaultBodyWritables._
import play.api.libs.ws._
import sttp.capabilities.Streams
import sttp.capabilities.akka.AkkaStreams
import sttp.model.Method
import sttp.tapir.Codec.PlainCodec
import sttp.tapir.internal.{CombineParams, Params, ParamsAsAny, RichEndpointInput, RichEndpointOutput, SplitParams}
Expand Down Expand Up @@ -168,8 +168,8 @@ class EndpointToPlayClient(clientOptions: PlayClientOptions, ws: StandaloneWSCli
case EndpointIO.Body(bodyType, codec, _) =>
val req2 = setBody(value, bodyType, codec, req)
req2
case sbw @ EndpointIO.StreamBodyWrapper(_) =>
val req2 = setStreamingBody(value, sbw.codec, req)
case EndpointIO.StreamBodyWrapper(StreamBodyIO(streams, _, _, _)) =>
val req2 = setStreamingBody(streams)(value.asInstanceOf[streams.BinaryStream], req)
req2
case EndpointIO.Header(name, codec, _) =>
val req2 = codec
Expand Down Expand Up @@ -270,18 +270,10 @@ class EndpointToPlayClient(clientOptions: PlayClientOptions, ws: StandaloneWSCli
req2
}

private def setStreamingBody[R, T, CF <: CodecFormat](v: T, codec: Codec[R, T, CF], req: StandaloneWSRequest): StandaloneWSRequest = {
val encoded: R = codec.encode(v)
encoded match {
case s: Source[ByteString, _] => req.withBody(s)
case is: InputStream =>
// For some reason, Play comes with a Writeable for Supplier[InputStream] but not InputStream directly
val inputStreamSupplier: Supplier[InputStream] = () => is
req.withBody(inputStreamSupplier)
case f: File => req.withBody(f)
case _ =>
// TODO what about other types that might have a BodyWritable defined out there?
throw new IllegalArgumentException("Streaming input other than Source, InputStream or File aren't supported")
private def setStreamingBody[S](streams: Streams[S])(v: streams.BinaryStream, req: StandaloneWSRequest): StandaloneWSRequest = {
streams match {
case AkkaStreams => req.withBody(v.asInstanceOf[AkkaStreams.BinaryStream])
case _ => throw new IllegalArgumentException("Only AkkaStreams streaming is supported")
}
}

Expand Down Expand Up @@ -311,23 +303,37 @@ class EndpointToPlayClient(clientOptions: PlayClientOptions, ws: StandaloneWSCli
case f => throw new IllegalArgumentException(s"Cannot decode: $f")
}

def responseFromOutput(out: EndpointOutput[_]): StandaloneWSResponse => Any = { response =>
out.bodyType
.map {
case RawBodyType.StringBody(_) => response.body
case RawBodyType.ByteArrayBody => response.body[Array[Byte]]
case RawBodyType.ByteBufferBody => response.body[ByteBuffer]
case RawBodyType.InputStreamBody => new ByteArrayInputStream(response.body[Array[Byte]])
case RawBodyType.FileBody =>
// TODO Consider using bodyAsSource to not load the whole content in memory
val f = clientOptions.createFile()
val outputStream = Files.newOutputStream(f.toPath)
outputStream.write(response.body[Array[Byte]])
outputStream.close()
f
case RawBodyType.MultipartBody(_, _) => throw new IllegalArgumentException("Multipart bodies aren't supported in responses")
}
.getOrElse(()) // Unit
private def responseFromOutput(out: EndpointOutput[_]): StandaloneWSResponse => Any = { response =>
bodyIsStream(out) match {
case Some(streams) =>
streams match {
case AkkaStreams => response.body[AkkaStreams.BinaryStream]
case _ => throw new IllegalArgumentException("Only AkkaStreams streaming is supported")
}
case None =>
out.bodyType
.map {
case RawBodyType.StringBody(_) => response.body
case RawBodyType.ByteArrayBody => response.body[Array[Byte]]
case RawBodyType.ByteBufferBody => response.body[ByteBuffer]
case RawBodyType.InputStreamBody => new ByteArrayInputStream(response.body[Array[Byte]])
case RawBodyType.FileBody =>
// TODO Consider using bodyAsSource to not load the whole content in memory
val f = clientOptions.createFile()
val outputStream = Files.newOutputStream(f.toPath)
outputStream.write(response.body[Array[Byte]])
outputStream.close()
f
case RawBodyType.MultipartBody(_, _) => throw new IllegalArgumentException("Multipart bodies aren't supported in responses")
}
.getOrElse(()) // Unit
}
}

private def bodyIsStream[I](out: EndpointOutput[I]): Option[Streams[_]] = {
out.traverseOutputs { case EndpointIO.StreamBodyWrapper(StreamBodyIO(streams, _, _, _)) =>
Vector(streams)
}.headOption
}

}

0 comments on commit 2263992

Please sign in to comment.