-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python #3715
Changes from 7 commits
07923c4
75d485e
048dbe6
5697a01
98c8d17
f6ce899
eea16a7
aea8953
adeeb38
33730d1
2c567a5
97386b3
370ba61
31e2317
dc1eed0
a74da87
9af51c4
23b039a
f257071
e6d0427
4280d04
d93bfe0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -313,6 +313,7 @@ private object SpecialLengths { | |
val PYTHON_EXCEPTION_THROWN = -2 | ||
val TIMING_DATA = -3 | ||
val END_OF_STREAM = -4 | ||
val NULL = -5 | ||
} | ||
|
||
private[spark] object PythonRDD extends Logging { | ||
|
@@ -374,49 +375,61 @@ private[spark] object PythonRDD extends Logging { | |
// The right way to implement this would be to use TypeTags to get the full | ||
// type of T. Since I don't want to introduce breaking changes throughout the | ||
// entire Spark API, I have to use this hacky approach: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe that these three comment lines originally referred to the code on line 377 (now line 395), where we look at the runtime type of the first element of the iterator. For clarity's sake, I think we should move the new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
def write(bytes: Array[Byte]) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And similarly, maybe call this writeBytes? |
||
if (bytes == null) { | ||
dataOut.writeInt(SpecialLengths.NULL) | ||
} else { | ||
dataOut.writeInt(bytes.length) | ||
dataOut.write(bytes) | ||
} | ||
} | ||
def writeS(str: String) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. new line missing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe call this |
||
if (str == null) { | ||
dataOut.writeInt(SpecialLengths.NULL) | ||
} else { | ||
writeUTF(str, dataOut) | ||
} | ||
} | ||
if (iter.hasNext) { | ||
val first = iter.next() | ||
val newIter = Seq(first).iterator ++ iter | ||
first match { | ||
case arr: Array[Byte] => | ||
newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes => | ||
dataOut.writeInt(bytes.length) | ||
dataOut.write(bytes) | ||
} | ||
newIter.asInstanceOf[Iterator[Array[Byte]]].foreach(write) | ||
case string: String => | ||
newIter.asInstanceOf[Iterator[String]].foreach { str => | ||
writeUTF(str, dataOut) | ||
} | ||
newIter.asInstanceOf[Iterator[String]].foreach(writeS) | ||
case stream: PortableDataStream => | ||
newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream => | ||
val bytes = stream.toArray() | ||
dataOut.writeInt(bytes.length) | ||
dataOut.write(bytes) | ||
write(stream.toArray()) | ||
} | ||
case (key: String, stream: PortableDataStream) => | ||
newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach { | ||
case (key, stream) => | ||
writeUTF(key, dataOut) | ||
val bytes = stream.toArray() | ||
dataOut.writeInt(bytes.length) | ||
dataOut.write(bytes) | ||
writeS(key) | ||
write(stream.toArray()) | ||
} | ||
case (key: String, value: String) => | ||
newIter.asInstanceOf[Iterator[(String, String)]].foreach { | ||
case (key, value) => | ||
writeUTF(key, dataOut) | ||
writeUTF(value, dataOut) | ||
writeS(key) | ||
writeS(value) | ||
} | ||
case (key: Array[Byte], value: Array[Byte]) => | ||
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach { | ||
case (key, value) => | ||
dataOut.writeInt(key.length) | ||
dataOut.write(key) | ||
dataOut.writeInt(value.length) | ||
dataOut.write(value) | ||
write(key) | ||
write(value) | ||
} | ||
// key is null | ||
case (null, v:Array[Byte]) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor style nit: I think there should be a space after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Also, for consistency with other "cases", v --> value. |
||
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach { | ||
case (key, value) => | ||
write(key) | ||
write(value) | ||
} | ||
|
||
case other => | ||
throw new SparkException("Unexpected element type " + first.getClass) | ||
throw new SparkException("Unexpected element type " + other.getClass) | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
""" | ||
Counts words in UTF8 encoded, '\n' delimited text received from the network every second. | ||
Usage: network_wordcount.py <zk> <topic> | ||
|
||
To run this on your local machine, you need to setup Kafka and create a producer first | ||
$ bin/zookeeper-server-start.sh config/zookeeper.properties | ||
$ bin/kafka-server-start.sh config/server.properties | ||
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I will remove these, and put a link here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All the above commands want to run from Kafka bin/ ? |
||
and then run the example | ||
`$ bin/spark-submit --driver-class-path lib_managed/jars/kafka_*.jar:\ | ||
external/kafka/target/scala-*/spark-streaming-kafka_*.jar examples/src/main/python/\ | ||
streaming/kafka_wordcount.py localhost:2181 test` | ||
""" | ||
|
||
import sys | ||
|
||
from pyspark import SparkContext | ||
from pyspark.streaming import StreamingContext | ||
from pyspark.streaming.kafka import KafkaUtils | ||
|
||
if __name__ == "__main__": | ||
if len(sys.argv) != 3: | ||
print >> sys.stderr, "Usage: network_wordcount.py <zk> <topic>" | ||
exit(-1) | ||
|
||
sc = SparkContext(appName="PythonStreamingKafkaWordCount") | ||
ssc = StreamingContext(sc, 1) | ||
|
||
zkQuorum, topic = sys.argv[1:] | ||
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) | ||
lines = kvs.map(lambda x: x[1]) | ||
counts = lines.flatMap(lambda line: line.split(" ")) \ | ||
.map(lambda word: (word, 1)) \ | ||
.reduceByKey(lambda a, b: a+b) | ||
counts.pprint() | ||
|
||
ssc.start() | ||
ssc.awaitTermination() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,6 +70,7 @@ class SpecialLengths(object): | |
PYTHON_EXCEPTION_THROWN = -2 | ||
TIMING_DATA = -3 | ||
END_OF_STREAM = -4 | ||
NULL = -5 | ||
|
||
|
||
class Serializer(object): | ||
|
@@ -145,8 +146,10 @@ def _read_with_length(self, stream): | |
length = read_int(stream) | ||
if length == SpecialLengths.END_OF_DATA_SECTION: | ||
raise EOFError | ||
if length == SpecialLengths.NULL: | ||
return None | ||
obj = stream.read(length) | ||
if obj == "": | ||
if len(obj) < length: | ||
raise EOFError | ||
return self.loads(obj) | ||
|
||
|
@@ -480,6 +483,8 @@ def loads(self, stream): | |
length = read_int(stream) | ||
if length == SpecialLengths.END_OF_DATA_SECTION: | ||
raise EOFError | ||
if length == SpecialLengths.NULL: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
return None | ||
s = stream.read(length) | ||
return s.decode("utf-8") if self.use_unicode else s | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
from py4j.java_collections import MapConverter | ||
from py4j.java_gateway import java_import, Py4JError | ||
|
||
from pyspark.storagelevel import StorageLevel | ||
from pyspark.serializers import PairDeserializer, NoOpSerializer | ||
from pyspark.streaming import DStream | ||
|
||
__all__ = ['KafkaUtils', 'utf8_decoder'] | ||
|
||
|
||
def utf8_decoder(s): | ||
""" Decode the unicode as UTF-8 """ | ||
return s and s.decode('utf-8') | ||
|
||
|
||
class KafkaUtils(object): | ||
|
||
@staticmethod | ||
def createStream(ssc, zkQuorum, groupId, topics, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should have a interface where arbitrary kafka parameters can be added. |
||
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2, | ||
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): | ||
""" | ||
Create an input stream that pulls messages from a Kafka Broker. | ||
|
||
:param ssc: StreamingContext object | ||
:param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..). | ||
:param groupId: The group id for this consumer. | ||
:param topics: Dict of (topic_name -> numPartitions) to consume. | ||
Each partition is consumed in its own thread. | ||
:param storageLevel: RDD storage level. | ||
:param keyDecoder: A function used to decode key | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldnt we mention what the default decoder is? |
||
:param valueDecoder: A function used to decode value | ||
:return: A DStream object | ||
""" | ||
java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils") | ||
|
||
param = { | ||
"zookeeper.connect": zkQuorum, | ||
"group.id": groupId, | ||
"zookeeper.connection.timeout.ms": "10000", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this automatically added? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ignore this. This is also present in the scala code, so let it be here. |
||
} | ||
if not isinstance(topics, dict): | ||
raise TypeError("topics should be dict") | ||
jtopics = MapConverter().convert(topics, ssc.sparkContext._gateway._gateway_client) | ||
jparam = MapConverter().convert(param, ssc.sparkContext._gateway._gateway_client) | ||
jlevel = ssc._sc._getJavaStorageLevel(storageLevel) | ||
|
||
def getClassByName(name): | ||
return ssc._jvm.org.apache.spark.util.Utils.classForName(name) | ||
|
||
try: | ||
array = getClassByName("[B") | ||
decoder = getClassByName("kafka.serializer.DefaultDecoder") | ||
jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder, | ||
jparam, jtopics, jlevel) | ||
except Py4JError, e: | ||
# TODO: use --jar once it also work on driver | ||
if not e.message or 'call a package' in e.message: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is clever; the 'call a package' errors are really confusing to users, so this message is pretty helpful. |
||
print "No kafka package, please build it and add it into classpath:" | ||
print " $ sbt/sbt streaming-kafka/package" | ||
print " $ bin/submit --driver-class-path lib_managed/jars/kafka_2.10-0.8.0.jar:" \ | ||
"external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.3.0-SNAPSHOT.jar" | ||
raise e | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The message that gets printed here is quite scary.
Its easy to miss the real message. Is it possible to quit in a such that the whole stack trace does not get printed. Rather it gracefully quits after printing this message? Perhaps an exit? @JoshRosen Is that good idea. |
||
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) | ||
stream = DStream(jstream, ssc, ser) | ||
return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this patch tries to fix a bug in Python regarding null values? If so, that probably should be a different patch from this Kafka patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdas I think that this same null-handling change has been proposed before but until now I don't think we had a great reason to pull it in, since none of our internal APIs relied on it and we were worried that it might mask the presence of bugs. Now that we have a need for it, though, it might be okay to pull in here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshRosen I am more or less okay with this, I just fear any regressions in core pyspark that this would cause. I dont understand the implications well enough. @JoshRosen Do you have any thoughts on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if this is a core python change, there should be unit tests that capture this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I will do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Offline conversation: This change is related to the SPARK-1630 which was attempted to be solved in #1551 Since that PR was rejected by @mateiz its better that he takes a look and comment on the validity of this change in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what was the discussion here? Before, the motivation was that Nones from Python would be encoded as a pickled version of None. Is it that we now want to send nulls back, and we are not using the pickle library for Java that would map them? Then it would be okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Kafka, each record is (K,V), but K could be null if not provided by producer. We pass the bytes from JVM to Python, do the deserialization in Python, no pickling needed. So we need some way to tell the key is null or empty string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mateiz Could take a call on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, it sounds good to allow nulls then.