forked from play2war/play2-war-plugin
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
issue play2war#95: asynchron output for servlet 3.1
The handling of a play result in the default request handler is moved to pushPlayResultToServletOS. This handling stayed the same. This change should not have any impact on the existing servlet wrapper for servlet < 3.1. For servlet 3.1, pushPlayResultToServletOS is overwritten to provide a non-blocking asynchron handling based on the new WriteListener.
- Loading branch information
Showing
3 changed files
with
251 additions
and
142 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,6 +91,147 @@ trait HttpServletRequestHandler extends RequestHandler { | |
bodyEnumerator |>>> bodyParser | ||
} | ||
|
||
protected def setHeaders(headers: Map[String, String], httpResponse: HttpServletResponse): Unit = { | ||
// Set response headers | ||
headers.filterNot(_ ==(CONTENT_LENGTH, "-1")).foreach { | ||
|
||
// Fix a bug for Set-Cookie header. | ||
// Multiple cookies could be merged in a single header | ||
// but it's not properly supported by some browsers | ||
case (name@play.api.http.HeaderNames.SET_COOKIE, value) => { | ||
getServletCookies(value).map { | ||
c => httpResponse.addCookie(c) | ||
} | ||
} | ||
|
||
case (name, value) => httpResponse.setHeader(name, value) | ||
} | ||
} | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
yanns
Author
Owner
|
||
|
||
/** | ||
* default implementation to push a play result to the servlet output stream | ||
* @param futureResult the result of the play action | ||
* @param cleanup clean up callback | ||
*/ | ||
protected def pushPlayResultToServletOS(futureResult: Future[SimpleResult], cleanup: () => Unit): Unit = { | ||
// TODO: should use the servlet thread here or use special thread pool for blocking IO operations | ||
// (https://github.com/dlecan/play2-war-plugin/issues/223) | ||
import play.api.libs.iteratee.Execution.Implicits.trampoline | ||
|
||
futureResult.map { result => | ||
getHttpResponse().getHttpServletResponse.foreach { httpResponse => | ||
|
||
val status = result.header.status | ||
val headers = result.header.headers | ||
val body: Enumerator[Array[Byte]] = result.body | ||
|
||
// TODO: handle connection KeepAlive and Close? | ||
val connection = result.connection | ||
|
||
Logger("play").trace("Sending simple result: " + result) | ||
|
||
httpResponse.setStatus(status) | ||
|
||
setHeaders(headers, httpResponse) | ||
|
||
val withContentLength = headers.exists(_._1 == CONTENT_LENGTH) | ||
val chunked = headers.exists { | ||
case (key, value) => key == HeaderNames.TRANSFER_ENCODING && value == HttpProtocol.CHUNKED | ||
} | ||
|
||
// TODO do not allow chunked for http 1.0? | ||
// if (chunked && connection == KeepAlive) { send Results.HttpVersionNotSupported("The response to this request is chunked and hence requires HTTP 1.1 to be sent, but this is a HTTP 1.0 request.") } | ||
|
||
// Stream the result | ||
if (withContentLength || chunked) { | ||
|
||
val hasError: AtomicBoolean = new AtomicBoolean(false) | ||
|
||
val bodyIteratee: Iteratee[Array[Byte], Unit] = { | ||
|
||
def step(in: Input[Array[Byte]]): Iteratee[Array[Byte], Unit] = (!hasError.get, in) match { | ||
case (true, Input.El(x)) => | ||
Iteratee.flatten( | ||
Future.successful( | ||
if (hasError.get) { | ||
() | ||
} else { | ||
getHttpResponse().getRichOutputStream.map { | ||
os => | ||
os.write(x) | ||
os.flush() | ||
} | ||
}) | ||
.extend1 { | ||
case Redeemed(_) => if (!hasError.get) Cont(step) else Done((), Input.Empty) | ||
case Thrown(ex) => | ||
hasError.set(true) | ||
Logger("play").debug(ex.toString) | ||
throw ex | ||
}) | ||
case (true, Input.Empty) => Cont(step) | ||
case (_, inp) => Done((), inp) | ||
} | ||
Iteratee.flatten( | ||
Future.successful(()) | ||
.map(_ => if (!hasError.get) Cont(step) else Done((), Input.Empty: Input[Array[Byte]]))) | ||
} | ||
|
||
val bodyConsumer = if (chunked) { | ||
// if the result body is chunked, the chunks are already encoded with metadata in Results.chunk | ||
// The problem is that the servlet container adds metadata again, leading the chunks encoded 2 times. | ||
// As workaround, we 'dechunk' the body one time before sending it to the servlet container | ||
body &> Results.dechunk |>>> bodyIteratee | ||
} else { | ||
body |>>> bodyIteratee | ||
} | ||
bodyConsumer.extend1 { | ||
case Redeemed(_) => | ||
cleanup() | ||
onHttpResponseComplete() | ||
case Thrown(ex) => | ||
Logger("play").debug(ex.toString) | ||
hasError.set(true) | ||
onHttpResponseComplete() | ||
} | ||
} else { | ||
Logger("play").trace("Result without Content-length") | ||
|
||
// No Content-Length header specified, buffer in-memory | ||
val byteBuffer = new ByteArrayOutputStream | ||
val byteArrayOSIteratee = Iteratee.fold(byteBuffer)((b, e: Array[Byte]) => { | ||
b.write(e); b | ||
}) | ||
|
||
val p = body |>>> Enumeratee.grouped(byteArrayOSIteratee) &>> Cont { | ||
case Input.El(buffer) => | ||
Logger("play").trace("Buffer size to send: " + buffer.size) | ||
getHttpResponse().getHttpServletResponse.map { response => | ||
// set the content length ourselves | ||
response.setContentLength(buffer.size) | ||
val os = response.getOutputStream | ||
os.flush() | ||
buffer.writeTo(os) | ||
} | ||
val p = Future.successful() | ||
Iteratee.flatten(p.map(_ => Done(1, Input.Empty: Input[ByteArrayOutputStream]))) | ||
|
||
case other => Error("unexpected input", other) | ||
} | ||
p.extend1 { | ||
case Redeemed(_) => | ||
cleanup() | ||
onHttpResponseComplete() | ||
case Thrown(ex) => | ||
Logger("play").debug(ex.toString) | ||
onHttpResponseComplete() | ||
} | ||
} | ||
|
||
} // end match foreach | ||
|
||
}.map { _ => cleanup() } | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -167,137 +308,6 @@ abstract class Play2GenericServletRequestHandler(val servletRequest: HttpServlet | |
def handle(result: SimpleResult): Unit | ||
} | ||
|
||
// converting servlet response to play's | ||
val response = new Response { | ||
|
||
def handle(result: SimpleResult): Unit = { | ||
|
||
getHttpResponse().getHttpServletResponse.foreach { httpResponse => | ||
|
||
val status = result.header.status | ||
val headers = result.header.headers | ||
val body = result.body | ||
|
||
// TODO: handle connection KeepAlive and Close? | ||
val connection = result.connection | ||
|
||
|
||
import play.api.libs.iteratee.Execution.Implicits.trampoline | ||
|
||
Logger("play").trace("Sending simple result: " + result) | ||
|
||
httpResponse.setStatus(status) | ||
|
||
// Set response headers | ||
headers.filterNot(_ == (CONTENT_LENGTH, "-1")).foreach { | ||
|
||
// Fix a bug for Set-Cookie header. | ||
// Multiple cookies could be merged in a single header | ||
// but it's not properly supported by some browsers | ||
case (name @ play.api.http.HeaderNames.SET_COOKIE, value) => { | ||
getServletCookies(value).map { | ||
c => httpResponse.addCookie(c) | ||
} | ||
} | ||
|
||
case (name, value) => httpResponse.setHeader(name, value) | ||
} | ||
|
||
val withContentLength = headers.exists ( _._1 == CONTENT_LENGTH) | ||
val chunked = headers.exists { case (key, value) => key == HeaderNames.TRANSFER_ENCODING && value == HttpProtocol.CHUNKED } | ||
|
||
// TODO do not allow chunked for http 1.0? | ||
// if (chunked && connection == KeepAlive) { send Results.HttpVersionNotSupported("The response to this request is chunked and hence requires HTTP 1.1 to be sent, but this is a HTTP 1.0 request.") } | ||
|
||
// Stream the result | ||
if (withContentLength || chunked) { | ||
|
||
val hasError: AtomicBoolean = new AtomicBoolean(false) | ||
|
||
val bodyIteratee: Iteratee[Array[Byte], Unit] = { | ||
|
||
def step(in: Input[Array[Byte]]): Iteratee[Array[Byte], Unit] = (!hasError.get, in) match { | ||
case (true, Input.El(x)) => | ||
Iteratee.flatten( | ||
Future.successful( | ||
if (hasError.get) { | ||
() | ||
} else { | ||
getHttpResponse().getRichOutputStream.map { os => | ||
os.write(x) | ||
os.flush() | ||
} | ||
}) | ||
.extend1 { | ||
case Redeemed(_) => if (!hasError.get) Cont(step) else Done((), Input.Empty) | ||
case Thrown(ex) => | ||
hasError.set(true) | ||
Logger("play").debug(ex.toString) | ||
throw ex | ||
}) | ||
case (true, Input.Empty) => Cont(step) | ||
case (_, inp) => Done((), inp) | ||
} | ||
Iteratee.flatten( | ||
Future.successful(()) | ||
.map(_ => if (!hasError.get) Cont(step) else Done((), Input.Empty: Input[Array[Byte]]))) | ||
} | ||
|
||
val bodyConsumer = if (chunked) { | ||
// if the result body is chunked, the chunks are already encoded with metadata in Results.chunk | ||
// The problem is that the servlet container add metadata again, leading the chunks encoded 2 times. | ||
// As workaround, we 'dechunk' the body one time before sending it to the servlet container | ||
body &> Results.dechunk |>>> bodyIteratee | ||
} else { | ||
body |>>> bodyIteratee | ||
} | ||
bodyConsumer.extend1 { | ||
case Redeemed(_) => | ||
cleanup() | ||
onHttpResponseComplete() | ||
case Thrown(ex) => | ||
Logger("play").debug(ex.toString) | ||
hasError.set(true) | ||
onHttpResponseComplete() | ||
} | ||
} else { | ||
Logger("play").trace("Result without Content-length") | ||
|
||
// No Content-Length header specified, buffer in-memory | ||
val byteBuffer = new ByteArrayOutputStream | ||
val writer: Function2[ByteArrayOutputStream, Array[Byte], Unit] = (b, x) => b.write(x) | ||
val byteArrayOSIteratee = Iteratee.fold(byteBuffer)((b, e: Array[Byte]) => { writer(b, e); b }) | ||
|
||
val p = body |>>> Enumeratee.grouped(byteArrayOSIteratee) &>> Cont { | ||
case Input.El(buffer) => | ||
Logger("play").trace("Buffer size to send: " + buffer.size) | ||
getHttpResponse().getRichOutputStream.map { os => | ||
// set the content length ourselves | ||
getHttpResponse().getHttpServletResponse.map(_.setContentLength(buffer.size)) | ||
os.flush() | ||
buffer.writeTo(os) | ||
} | ||
val p = Future.successful() | ||
Iteratee.flatten(p.map(_ => Done(1, Input.Empty: Input[ByteArrayOutputStream]))) | ||
|
||
case other => Error("unexpected input", other) | ||
} | ||
p.extend1 { | ||
case Redeemed(_) => | ||
cleanup() | ||
onHttpResponseComplete() | ||
case Thrown(ex) => | ||
Logger("play").debug(ex.toString) | ||
onHttpResponseComplete() | ||
} | ||
} | ||
|
||
} // end match foreach | ||
|
||
} // end handle method | ||
|
||
} | ||
|
||
def cleanFlashCookie(result: SimpleResult): SimpleResult = { | ||
val header = result.header | ||
|
||
|
@@ -357,19 +367,14 @@ abstract class Play2GenericServletRequestHandler(val servletRequest: HttpServlet | |
|
||
val eventuallyResult: Future[SimpleResult] = feedBodyParser(bodyParser) | ||
|
||
val sent = eventuallyResult.recoverWith { | ||
val eventuallyResultWithError = eventuallyResult.recoverWith { | ||
case error => | ||
Logger("play").error("Cannot invoke the action, eventually got an error: " + error) | ||
app.map(_.handleError(requestHeader, error)) | ||
.getOrElse(DefaultGlobal.onError(requestHeader, error)) | ||
}.map { result => | ||
response.handle(cleanFlashCookie(result)) | ||
} | ||
}.map { result => cleanFlashCookie(result) } | ||
|
||
// Finally, clean up | ||
sent.map { _ => | ||
cleanup() | ||
} | ||
pushPlayResultToServletOS(eventuallyResultWithError, cleanup) | ||
} | ||
|
||
onFinishService() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
No need to create a new collection (filterNot), and don't use
map
whenforeach
is all you need (map
creates yet another collection):