diff --git a/project-code/core/common/src/main/scala/play/core/server/servlet/RequestHandler.scala b/project-code/core/common/src/main/scala/play/core/server/servlet/RequestHandler.scala index 56893b7..f8c2b0d 100644 --- a/project-code/core/common/src/main/scala/play/core/server/servlet/RequestHandler.scala +++ b/project-code/core/common/src/main/scala/play/core/server/servlet/RequestHandler.scala @@ -91,6 +91,147 @@ trait HttpServletRequestHandler extends RequestHandler { bodyEnumerator |>>> bodyParser } + protected def setHeaders(headers: Map[String, String], httpResponse: HttpServletResponse): Unit = { + // Set response headers + headers.filterNot(_ ==(CONTENT_LENGTH, "-1")).foreach { + + // Fix a bug for Set-Cookie header. + // Multiple cookies could be merged in a single header + // but it's not properly supported by some browsers + case (name@play.api.http.HeaderNames.SET_COOKIE, value) => { + getServletCookies(value).map { + c => httpResponse.addCookie(c) + } + } + + case (name, value) => httpResponse.setHeader(name, value) + } + } + + /** + * default implementation to push a play result to the servlet output stream + * @param futureResult the result of the play action + * @param cleanup clean up callback + */ + protected def pushPlayResultToServletOS(futureResult: Future[SimpleResult], cleanup: () => Unit): Unit = { + // TODO: should use the servlet thread here or use special thread pool for blocking IO operations + // (https://github.com/dlecan/play2-war-plugin/issues/223) + import play.api.libs.iteratee.Execution.Implicits.trampoline + + futureResult.map { result => + getHttpResponse().getHttpServletResponse.foreach { httpResponse => + + val status = result.header.status + val headers = result.header.headers + val body: Enumerator[Array[Byte]] = result.body + + // TODO: handle connection KeepAlive and Close? + val connection = result.connection + + Logger("play").trace("Sending simple result: " + result) + + httpResponse.setStatus(status) + + setHeaders(headers, httpResponse) + + val withContentLength = headers.exists(_._1 == CONTENT_LENGTH) + val chunked = headers.exists { + case (key, value) => key == HeaderNames.TRANSFER_ENCODING && value == HttpProtocol.CHUNKED + } + + // TODO do not allow chunked for http 1.0? + // if (chunked && connection == KeepAlive) { send Results.HttpVersionNotSupported("The response to this request is chunked and hence requires HTTP 1.1 to be sent, but this is a HTTP 1.0 request.") } + + // Stream the result + if (withContentLength || chunked) { + + val hasError: AtomicBoolean = new AtomicBoolean(false) + + val bodyIteratee: Iteratee[Array[Byte], Unit] = { + + def step(in: Input[Array[Byte]]): Iteratee[Array[Byte], Unit] = (!hasError.get, in) match { + case (true, Input.El(x)) => + Iteratee.flatten( + Future.successful( + if (hasError.get) { + () + } else { + getHttpResponse().getRichOutputStream.map { + os => + os.write(x) + os.flush() + } + }) + .extend1 { + case Redeemed(_) => if (!hasError.get) Cont(step) else Done((), Input.Empty) + case Thrown(ex) => + hasError.set(true) + Logger("play").debug(ex.toString) + throw ex + }) + case (true, Input.Empty) => Cont(step) + case (_, inp) => Done((), inp) + } + Iteratee.flatten( + Future.successful(()) + .map(_ => if (!hasError.get) Cont(step) else Done((), Input.Empty: Input[Array[Byte]]))) + } + + val bodyConsumer = if (chunked) { + // if the result body is chunked, the chunks are already encoded with metadata in Results.chunk + // The problem is that the servlet container adds metadata again, leading the chunks encoded 2 times. + // As workaround, we 'dechunk' the body one time before sending it to the servlet container + body &> Results.dechunk |>>> bodyIteratee + } else { + body |>>> bodyIteratee + } + bodyConsumer.extend1 { + case Redeemed(_) => + cleanup() + onHttpResponseComplete() + case Thrown(ex) => + Logger("play").debug(ex.toString) + hasError.set(true) + onHttpResponseComplete() + } + } else { + Logger("play").trace("Result without Content-length") + + // No Content-Length header specified, buffer in-memory + val byteBuffer = new ByteArrayOutputStream + val byteArrayOSIteratee = Iteratee.fold(byteBuffer)((b, e: Array[Byte]) => { + b.write(e); b + }) + + val p = body |>>> Enumeratee.grouped(byteArrayOSIteratee) &>> Cont { + case Input.El(buffer) => + Logger("play").trace("Buffer size to send: " + buffer.size) + getHttpResponse().getHttpServletResponse.map { response => + // set the content length ourselves + response.setContentLength(buffer.size) + val os = response.getOutputStream + os.flush() + buffer.writeTo(os) + } + val p = Future.successful() + Iteratee.flatten(p.map(_ => Done(1, Input.Empty: Input[ByteArrayOutputStream]))) + + case other => Error("unexpected input", other) + } + p.extend1 { + case Redeemed(_) => + cleanup() + onHttpResponseComplete() + case Thrown(ex) => + Logger("play").debug(ex.toString) + onHttpResponseComplete() + } + } + + } // end match foreach + + }.map { _ => cleanup() } + } } /** @@ -167,137 +308,6 @@ abstract class Play2GenericServletRequestHandler(val servletRequest: HttpServlet def handle(result: SimpleResult): Unit } - // converting servlet response to play's - val response = new Response { - - def handle(result: SimpleResult): Unit = { - - getHttpResponse().getHttpServletResponse.foreach { httpResponse => - - val status = result.header.status - val headers = result.header.headers - val body = result.body - - // TODO: handle connection KeepAlive and Close? - val connection = result.connection - - - import play.api.libs.iteratee.Execution.Implicits.trampoline - - Logger("play").trace("Sending simple result: " + result) - - httpResponse.setStatus(status) - - // Set response headers - headers.filterNot(_ == (CONTENT_LENGTH, "-1")).foreach { - - // Fix a bug for Set-Cookie header. - // Multiple cookies could be merged in a single header - // but it's not properly supported by some browsers - case (name @ play.api.http.HeaderNames.SET_COOKIE, value) => { - getServletCookies(value).map { - c => httpResponse.addCookie(c) - } - } - - case (name, value) => httpResponse.setHeader(name, value) - } - - val withContentLength = headers.exists ( _._1 == CONTENT_LENGTH) - val chunked = headers.exists { case (key, value) => key == HeaderNames.TRANSFER_ENCODING && value == HttpProtocol.CHUNKED } - - // TODO do not allow chunked for http 1.0? - // if (chunked && connection == KeepAlive) { send Results.HttpVersionNotSupported("The response to this request is chunked and hence requires HTTP 1.1 to be sent, but this is a HTTP 1.0 request.") } - - // Stream the result - if (withContentLength || chunked) { - - val hasError: AtomicBoolean = new AtomicBoolean(false) - - val bodyIteratee: Iteratee[Array[Byte], Unit] = { - - def step(in: Input[Array[Byte]]): Iteratee[Array[Byte], Unit] = (!hasError.get, in) match { - case (true, Input.El(x)) => - Iteratee.flatten( - Future.successful( - if (hasError.get) { - () - } else { - getHttpResponse().getRichOutputStream.map { os => - os.write(x) - os.flush() - } - }) - .extend1 { - case Redeemed(_) => if (!hasError.get) Cont(step) else Done((), Input.Empty) - case Thrown(ex) => - hasError.set(true) - Logger("play").debug(ex.toString) - throw ex - }) - case (true, Input.Empty) => Cont(step) - case (_, inp) => Done((), inp) - } - Iteratee.flatten( - Future.successful(()) - .map(_ => if (!hasError.get) Cont(step) else Done((), Input.Empty: Input[Array[Byte]]))) - } - - val bodyConsumer = if (chunked) { - // if the result body is chunked, the chunks are already encoded with metadata in Results.chunk - // The problem is that the servlet container add metadata again, leading the chunks encoded 2 times. - // As workaround, we 'dechunk' the body one time before sending it to the servlet container - body &> Results.dechunk |>>> bodyIteratee - } else { - body |>>> bodyIteratee - } - bodyConsumer.extend1 { - case Redeemed(_) => - cleanup() - onHttpResponseComplete() - case Thrown(ex) => - Logger("play").debug(ex.toString) - hasError.set(true) - onHttpResponseComplete() - } - } else { - Logger("play").trace("Result without Content-length") - - // No Content-Length header specified, buffer in-memory - val byteBuffer = new ByteArrayOutputStream - val writer: Function2[ByteArrayOutputStream, Array[Byte], Unit] = (b, x) => b.write(x) - val byteArrayOSIteratee = Iteratee.fold(byteBuffer)((b, e: Array[Byte]) => { writer(b, e); b }) - - val p = body |>>> Enumeratee.grouped(byteArrayOSIteratee) &>> Cont { - case Input.El(buffer) => - Logger("play").trace("Buffer size to send: " + buffer.size) - getHttpResponse().getRichOutputStream.map { os => - // set the content length ourselves - getHttpResponse().getHttpServletResponse.map(_.setContentLength(buffer.size)) - os.flush() - buffer.writeTo(os) - } - val p = Future.successful() - Iteratee.flatten(p.map(_ => Done(1, Input.Empty: Input[ByteArrayOutputStream]))) - - case other => Error("unexpected input", other) - } - p.extend1 { - case Redeemed(_) => - cleanup() - onHttpResponseComplete() - case Thrown(ex) => - Logger("play").debug(ex.toString) - onHttpResponseComplete() - } - } - - } // end match foreach - - } // end handle method - - } - def cleanFlashCookie(result: SimpleResult): SimpleResult = { val header = result.header @@ -357,19 +367,14 @@ abstract class Play2GenericServletRequestHandler(val servletRequest: HttpServlet val eventuallyResult: Future[SimpleResult] = feedBodyParser(bodyParser) - val sent = eventuallyResult.recoverWith { + val eventuallyResultWithError = eventuallyResult.recoverWith { case error => Logger("play").error("Cannot invoke the action, eventually got an error: " + error) app.map(_.handleError(requestHeader, error)) .getOrElse(DefaultGlobal.onError(requestHeader, error)) - }.map { result => - response.handle(cleanFlashCookie(result)) - } + }.map { result => cleanFlashCookie(result) } - // Finally, clean up - sent.map { _ => - cleanup() - } + pushPlayResultToServletOS(eventuallyResultWithError, cleanup) } onFinishService() diff --git a/project-code/core/servlet30/src/main/scala/RequestHandler30.scala b/project-code/core/servlet30/src/main/scala/RequestHandler30.scala index e380aa5..de311d1 100644 --- a/project-code/core/servlet30/src/main/scala/RequestHandler30.scala +++ b/project-code/core/servlet30/src/main/scala/RequestHandler30.scala @@ -41,7 +41,7 @@ class Play2Servlet30RequestHandler(servletRequest: HttpServletRequest) } protected override def onHttpResponseComplete() = { - asyncContext.complete + asyncContext.complete() } protected override def getHttpRequest(): RichHttpServletRequest = { diff --git a/project-code/core/servlet31/src/main/scala/play/core/server/servlet31/RequestHandler31.scala b/project-code/core/servlet31/src/main/scala/play/core/server/servlet31/RequestHandler31.scala index 7bd1e1f..9e67fdc 100644 --- a/project-code/core/servlet31/src/main/scala/play/core/server/servlet31/RequestHandler31.scala +++ b/project-code/core/servlet31/src/main/scala/play/core/server/servlet31/RequestHandler31.scala @@ -4,12 +4,13 @@ import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import play.core.server.servlet.{RichHttpServletResponse, RichHttpServletRequest, Play2GenericServletRequestHandler} import java.io.{OutputStream, InputStream} import java.util.concurrent.atomic.AtomicBoolean -import javax.servlet.{ServletInputStream, ReadListener, AsyncEvent} +import javax.servlet._ import play.api.Logger import play.api.libs.iteratee._ import scala.concurrent.{Future, Promise} +import play.api.mvc.{Results, SimpleResult} import play.api.libs.iteratee.Input.El -import play.api.mvc.SimpleResult +import play.api.http.{HttpProtocol, HeaderNames} class Play2Servlet31RequestHandler(servletRequest: HttpServletRequest) extends Play2GenericServletRequestHandler(servletRequest, None) @@ -143,6 +144,109 @@ class Play2Servlet31RequestHandler(servletRequest: HttpServletRequest) val servletInputStream = servletRequest.getInputStream readServletRequest(servletInputStream, bodyParser) } + + /** + * push the result of a play action asynchronously to a servlet output stream + * @param httpResponse servlet response + * @param out servlet output stream + * @param futureResult result of a play action + * @param cleanup clean up callback + */ + private class ResultWriteListener( + val httpResponse: HttpServletResponse, + val out: ServletOutputStream, + val futureResult: Future[SimpleResult], + val cleanup: () => Unit) extends WriteListener { + + // the promise is completed when a write to the servlet IO is possible + @volatile var iterateeP = Promise[Iteratee[Array[Byte], Unit]]() + val futureIteratee = iterateeP.future + Logger("play.war.servlet31").trace("set write listener") + + private def step(): Iteratee[Array[Byte], Unit] = { + Logger("play.war.servlet31").trace(s"step") + iterateeP = Promise[Iteratee[Array[Byte], Unit]]() + + Cont[Array[Byte], Unit] { + case Input.EOF => + Logger("play.war.servlet31").trace(s"EOF, finished!") + onHttpResponseComplete() + cleanup() + Done[Array[Byte], Unit](Unit) + + case Input.Empty => + Logger("play.war.servlet31").trace(s"empty, just continue") + step() + + case Input.El(buffer) => + out.write(buffer) + if (out.isReady) { + out.flush() + } + Logger("play.war.servlet31").trace(s"send ${buffer.length} bytes. out.isReady=${out.isReady}") + if (out.isReady) { + // can immediately push the next bytes + step() + } else { + // wait for next onWritePossible + Iteratee.flatten(iterateeP.future) + } + } + } + + override def onWritePossible(): Unit = { + Logger("play.war.servlet31").trace("onWritePossible - begin") + if (iterateeP.isCompleted) { + throw new Exception("race condition: the servlet container should not call onWritePossible() when the iteratee is completed. Please report.") + } + + // write is possible, let's use it + iterateeP.success(step()) + } + + override def onError(t: Throwable): Unit = { + Logger("play.war.servlet31").error("error while writing result to servlet output stream", t) + onHttpResponseComplete() + cleanup() + } + + import play.core.Execution.Implicits.internalContext + + futureIteratee.foreach { bodyIteratee => + futureResult.foreach { result => + val status = result.header.status + val headers = result.header.headers + + Logger("play.war.servlet31").trace("Sending simple result: " + result) + + httpResponse.setStatus(status) + + setHeaders(headers, httpResponse) + + val chunked = headers.exists { case (key, value) => key == HeaderNames.TRANSFER_ENCODING && value == HttpProtocol.CHUNKED } + + Logger("play.war.servlet31").trace(s"the body iteratee is ready. chunked=$chunked") + if (chunked) { + // if the result body is chunked, the chunks are already encoded with metadata in Results.chunk + // The problem is that the servlet container adds metadata again, leading the chunks encoded 2 times. + // As workaround, we 'dechunk' the body one time before sending it to the servlet container + result.body &> Results.dechunk |>>> bodyIteratee + } else { + result.body |>>> bodyIteratee + } + } + } + } + + override protected def pushPlayResultToServletOS(futureResult: Future[SimpleResult], cleanup: () => Unit): Unit = { + getHttpResponse().getHttpServletResponse map { httpResponse => + + val out = httpResponse.getOutputStream.asInstanceOf[ServletOutputStream] + + // tomcat insists that the WriteListener is set on the servlet thread. + out.setWriteListener(new ResultWriteListener(httpResponse, out, futureResult, cleanup)) + } + } } private[servlet31] class AsyncListener(val requestId: String) extends javax.servlet.AsyncListener {