Skip to content

Commit

Permalink
[SPARK-4606] Send EOF to child JVM when there's no more data to read.
Browse files Browse the repository at this point in the history
Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#3460 from vanzin/SPARK-4606 and squashes the following commits:

031207d [Marcelo Vanzin] [SPARK-4606] Send EOF to child JVM when there's no more data to read.
  • Loading branch information
Marcelo Vanzin authored and JoshRosen committed Dec 24, 2014
1 parent 3f5f4cc commit 7e2deb7
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ private[spark] object SparkSubmitDriverBootstrapper {
val isWindows = Utils.isWindows
val isSubprocess = sys.env.contains("IS_SUBPROCESS")
if (!isWindows) {
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin",
propagateEof = true)
stdinThread.start()
// Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on
// broken pipe, signaling that the parent process has exited. This is the case if the
Expand Down
24 changes: 17 additions & 7 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1847,19 +1847,29 @@ private[spark] object Utils extends Logging {
/**
* A utility class to redirect the child process's stdout or stderr.
*/
private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String)
private[spark] class RedirectThread(
in: InputStream,
out: OutputStream,
name: String,
propagateEof: Boolean = false)
extends Thread(name) {

setDaemon(true)
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
out.write(buf, 0, len)
out.flush()
len = in.read(buf)
try {
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
out.write(buf, 0, len)
out.flush()
len = in.read(buf)
}
} finally {
if (propagateEof) {
out.close()
}
}
}
}
Expand Down

0 comments on commit 7e2deb7

Please sign in to comment.