Skip to content

Commit

Permalink
Merge pull request #243 from yanns/servlet31output
Browse files Browse the repository at this point in the history
issue #95: asynchron output for servlet 3.1
  • Loading branch information
dlecan committed May 12, 2014
2 parents 08aa3f0 + b580b8c commit 76f494a
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,147 @@ trait HttpServletRequestHandler extends RequestHandler {
bodyEnumerator |>>> bodyParser
}

protected def setHeaders(headers: Map[String, String], httpResponse: HttpServletResponse): Unit = {
// Set response headers
headers.filterNot(_ ==(CONTENT_LENGTH, "-1")).foreach {

// Fix a bug for Set-Cookie header.
// Multiple cookies could be merged in a single header
// but it's not properly supported by some browsers
case (name@play.api.http.HeaderNames.SET_COOKIE, value) => {
getServletCookies(value).map {
c => httpResponse.addCookie(c)
}
}

case (name, value) => httpResponse.setHeader(name, value)
}
}

/**
* default implementation to push a play result to the servlet output stream
* @param futureResult the result of the play action
* @param cleanup clean up callback
*/
protected def pushPlayResultToServletOS(futureResult: Future[SimpleResult], cleanup: () => Unit): Unit = {
// TODO: should use the servlet thread here or use special thread pool for blocking IO operations
// (https://github.com/dlecan/play2-war-plugin/issues/223)
import play.api.libs.iteratee.Execution.Implicits.trampoline

futureResult.map { result =>
getHttpResponse().getHttpServletResponse.foreach { httpResponse =>

val status = result.header.status
val headers = result.header.headers
val body: Enumerator[Array[Byte]] = result.body

// TODO: handle connection KeepAlive and Close?
val connection = result.connection

Logger("play").trace("Sending simple result: " + result)

httpResponse.setStatus(status)

setHeaders(headers, httpResponse)

val withContentLength = headers.exists(_._1 == CONTENT_LENGTH)
val chunked = headers.exists {
case (key, value) => key == HeaderNames.TRANSFER_ENCODING && value == HttpProtocol.CHUNKED
}

// TODO do not allow chunked for http 1.0?
// if (chunked && connection == KeepAlive) { send Results.HttpVersionNotSupported("The response to this request is chunked and hence requires HTTP 1.1 to be sent, but this is a HTTP 1.0 request.") }

// Stream the result
if (withContentLength || chunked) {

val hasError: AtomicBoolean = new AtomicBoolean(false)

val bodyIteratee: Iteratee[Array[Byte], Unit] = {

def step(in: Input[Array[Byte]]): Iteratee[Array[Byte], Unit] = (!hasError.get, in) match {
case (true, Input.El(x)) =>
Iteratee.flatten(
Future.successful(
if (hasError.get) {
()
} else {
getHttpResponse().getRichOutputStream.map {
os =>
os.write(x)
os.flush()
}
})
.extend1 {
case Redeemed(_) => if (!hasError.get) Cont(step) else Done((), Input.Empty)
case Thrown(ex) =>
hasError.set(true)
Logger("play").debug(ex.toString)
throw ex
})
case (true, Input.Empty) => Cont(step)
case (_, inp) => Done((), inp)
}
Iteratee.flatten(
Future.successful(())
.map(_ => if (!hasError.get) Cont(step) else Done((), Input.Empty: Input[Array[Byte]])))
}

val bodyConsumer = if (chunked) {
// if the result body is chunked, the chunks are already encoded with metadata in Results.chunk
// The problem is that the servlet container adds metadata again, leading the chunks encoded 2 times.
// As workaround, we 'dechunk' the body one time before sending it to the servlet container
body &> Results.dechunk |>>> bodyIteratee
} else {
body |>>> bodyIteratee
}
bodyConsumer.extend1 {
case Redeemed(_) =>
cleanup()
onHttpResponseComplete()
case Thrown(ex) =>
Logger("play").debug(ex.toString)
hasError.set(true)
onHttpResponseComplete()
}
} else {
Logger("play").trace("Result without Content-length")

// No Content-Length header specified, buffer in-memory
val byteBuffer = new ByteArrayOutputStream
val byteArrayOSIteratee = Iteratee.fold(byteBuffer)((b, e: Array[Byte]) => {
b.write(e); b
})

val p = body |>>> Enumeratee.grouped(byteArrayOSIteratee) &>> Cont {
case Input.El(buffer) =>
Logger("play").trace("Buffer size to send: " + buffer.size)
getHttpResponse().getHttpServletResponse.map { response =>
// set the content length ourselves
response.setContentLength(buffer.size)
val os = response.getOutputStream
os.flush()
buffer.writeTo(os)
}
val p = Future.successful()
Iteratee.flatten(p.map(_ => Done(1, Input.Empty: Input[ByteArrayOutputStream])))

case other => Error("unexpected input", other)
}
p.extend1 {
case Redeemed(_) =>
cleanup()
onHttpResponseComplete()
case Thrown(ex) =>
Logger("play").debug(ex.toString)
onHttpResponseComplete()
}
}

} // end match foreach

}.map { _ => cleanup() }
}
}

/**
Expand Down Expand Up @@ -167,137 +308,6 @@ abstract class Play2GenericServletRequestHandler(val servletRequest: HttpServlet
def handle(result: SimpleResult): Unit
}

// converting servlet response to play's
val response = new Response {

def handle(result: SimpleResult): Unit = {

getHttpResponse().getHttpServletResponse.foreach { httpResponse =>

val status = result.header.status
val headers = result.header.headers
val body = result.body

// TODO: handle connection KeepAlive and Close?
val connection = result.connection


import play.api.libs.iteratee.Execution.Implicits.trampoline

Logger("play").trace("Sending simple result: " + result)

httpResponse.setStatus(status)

// Set response headers
headers.filterNot(_ == (CONTENT_LENGTH, "-1")).foreach {

// Fix a bug for Set-Cookie header.
// Multiple cookies could be merged in a single header
// but it's not properly supported by some browsers
case (name @ play.api.http.HeaderNames.SET_COOKIE, value) => {
getServletCookies(value).map {
c => httpResponse.addCookie(c)
}
}

case (name, value) => httpResponse.setHeader(name, value)
}

val withContentLength = headers.exists ( _._1 == CONTENT_LENGTH)
val chunked = headers.exists { case (key, value) => key == HeaderNames.TRANSFER_ENCODING && value == HttpProtocol.CHUNKED }

// TODO do not allow chunked for http 1.0?
// if (chunked && connection == KeepAlive) { send Results.HttpVersionNotSupported("The response to this request is chunked and hence requires HTTP 1.1 to be sent, but this is a HTTP 1.0 request.") }

// Stream the result
if (withContentLength || chunked) {

val hasError: AtomicBoolean = new AtomicBoolean(false)

val bodyIteratee: Iteratee[Array[Byte], Unit] = {

def step(in: Input[Array[Byte]]): Iteratee[Array[Byte], Unit] = (!hasError.get, in) match {
case (true, Input.El(x)) =>
Iteratee.flatten(
Future.successful(
if (hasError.get) {
()
} else {
getHttpResponse().getRichOutputStream.map { os =>
os.write(x)
os.flush()
}
})
.extend1 {
case Redeemed(_) => if (!hasError.get) Cont(step) else Done((), Input.Empty)
case Thrown(ex) =>
hasError.set(true)
Logger("play").debug(ex.toString)
throw ex
})
case (true, Input.Empty) => Cont(step)
case (_, inp) => Done((), inp)
}
Iteratee.flatten(
Future.successful(())
.map(_ => if (!hasError.get) Cont(step) else Done((), Input.Empty: Input[Array[Byte]])))
}

val bodyConsumer = if (chunked) {
// if the result body is chunked, the chunks are already encoded with metadata in Results.chunk
// The problem is that the servlet container add metadata again, leading the chunks encoded 2 times.
// As workaround, we 'dechunk' the body one time before sending it to the servlet container
body &> Results.dechunk |>>> bodyIteratee
} else {
body |>>> bodyIteratee
}
bodyConsumer.extend1 {
case Redeemed(_) =>
cleanup()
onHttpResponseComplete()
case Thrown(ex) =>
Logger("play").debug(ex.toString)
hasError.set(true)
onHttpResponseComplete()
}
} else {
Logger("play").trace("Result without Content-length")

// No Content-Length header specified, buffer in-memory
val byteBuffer = new ByteArrayOutputStream
val writer: Function2[ByteArrayOutputStream, Array[Byte], Unit] = (b, x) => b.write(x)
val byteArrayOSIteratee = Iteratee.fold(byteBuffer)((b, e: Array[Byte]) => { writer(b, e); b })

val p = body |>>> Enumeratee.grouped(byteArrayOSIteratee) &>> Cont {
case Input.El(buffer) =>
Logger("play").trace("Buffer size to send: " + buffer.size)
getHttpResponse().getRichOutputStream.map { os =>
// set the content length ourselves
getHttpResponse().getHttpServletResponse.map(_.setContentLength(buffer.size))
os.flush()
buffer.writeTo(os)
}
val p = Future.successful()
Iteratee.flatten(p.map(_ => Done(1, Input.Empty: Input[ByteArrayOutputStream])))

case other => Error("unexpected input", other)
}
p.extend1 {
case Redeemed(_) =>
cleanup()
onHttpResponseComplete()
case Thrown(ex) =>
Logger("play").debug(ex.toString)
onHttpResponseComplete()
}
}

} // end match foreach

} // end handle method

}

def cleanFlashCookie(result: SimpleResult): SimpleResult = {
val header = result.header

Expand Down Expand Up @@ -357,19 +367,14 @@ abstract class Play2GenericServletRequestHandler(val servletRequest: HttpServlet

val eventuallyResult: Future[SimpleResult] = feedBodyParser(bodyParser)

val sent = eventuallyResult.recoverWith {
val eventuallyResultWithError = eventuallyResult.recoverWith {
case error =>
Logger("play").error("Cannot invoke the action, eventually got an error: " + error)
app.map(_.handleError(requestHeader, error))
.getOrElse(DefaultGlobal.onError(requestHeader, error))
}.map { result =>
response.handle(cleanFlashCookie(result))
}
}.map { result => cleanFlashCookie(result) }

// Finally, clean up
sent.map { _ =>
cleanup()
}
pushPlayResultToServletOS(eventuallyResultWithError, cleanup)
}

onFinishService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Play2Servlet30RequestHandler(servletRequest: HttpServletRequest)
}

protected override def onHttpResponseComplete() = {
asyncContext.complete
asyncContext.complete()
}

protected override def getHttpRequest(): RichHttpServletRequest = {
Expand Down
Loading

0 comments on commit 76f494a

Please sign in to comment.