From 0b769b73fb7ae314325857138a2d3138ed157908 Mon Sep 17 00:00:00 2001 From: David Lemieux Date: Wed, 28 May 2014 15:50:35 -0700 Subject: [PATCH] Spark 1916 The changes could be ported back to 0.9 as well. Changing in.read to in.readFully to read the whole input stream rather than the first 1020 bytes. This should ok considering that Flume caps the body size to 32K by default. Author: David Lemieux Closes #865 from lemieud/SPARK-1916 and squashes the following commits: a265673 [David Lemieux] Updated SparkFlumeEvent to read the whole stream rather than the first X bytes. --- .../org/apache/spark/streaming/flume/FlumeInputDStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index df7605fe579f8..5be33f1d5c428 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -63,7 +63,7 @@ class SparkFlumeEvent() extends Externalizable { def readExternal(in: ObjectInput) { val bodyLength = in.readInt() val bodyBuff = new Array[Byte](bodyLength) - in.read(bodyBuff) + in.readFully(bodyBuff) val numHeaders = in.readInt() val headers = new java.util.HashMap[CharSequence, CharSequence]