Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python #3715

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ private object SpecialLengths {
val PYTHON_EXCEPTION_THROWN = -2
val TIMING_DATA = -3
val END_OF_STREAM = -4
val NULL = -5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this patch tries to fix a bug in Python regarding null values? If so, that probably should be a different patch from this Kafka patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas I think that this same null-handling change has been proposed before but until now I don't think we had a great reason to pull it in, since none of our internal APIs relied on it and we were worried that it might mask the presence of bugs. Now that we have a need for it, though, it might be okay to pull in here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen I am more or less okay with this, I just fear any regressions in core pyspark that this would cause. I dont understand the implications well enough. @JoshRosen Do you have any thoughts on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if this is a core python change, there should be unit tests that capture this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I will do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline conversation: This change is related to the SPARK-1630 which was attempted to be solved in #1551 Since that PR was rejected by @mateiz its better that he takes a look and comment on the validity of this change in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what was the discussion here? Before, the motivation was that Nones from Python would be encoded as a pickled version of None. Is it that we now want to send nulls back, and we are not using the pickle library for Java that would map them? Then it would be okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Kafka, each record is (K,V), but K could be null if not provided by producer. We pass the bytes from JVM to Python, do the deserialization in Python, no pickling needed. So we need some way to tell the key is null or empty string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mateiz Could take a call on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, it sounds good to allow nulls then.

}

private[spark] object PythonRDD extends Logging {
Expand Down Expand Up @@ -374,49 +375,61 @@ private[spark] object PythonRDD extends Logging {
// The right way to implement this would be to use TypeTags to get the full
// type of T. Since I don't want to introduce breaking changes throughout the
// entire Spark API, I have to use this hacky approach:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that these three comment lines originally referred to the code on line 377 (now line 395), where we look at the runtime type of the first element of the iterator. For clarity's sake, I think we should move the new write methods above this comment so that it stays in the right spot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

def write(bytes: Array[Byte]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And similarly, maybe call this writeBytes?

if (bytes == null) {
dataOut.writeInt(SpecialLengths.NULL)
} else {
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
}
def writeS(str: String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new line missing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this writeString? It's a little bit more verbose, but less prone to typos.

if (str == null) {
dataOut.writeInt(SpecialLengths.NULL)
} else {
writeUTF(str, dataOut)
}
}
if (iter.hasNext) {
val first = iter.next()
val newIter = Seq(first).iterator ++ iter
first match {
case arr: Array[Byte] =>
newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes =>
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
newIter.asInstanceOf[Iterator[Array[Byte]]].foreach(write)
case string: String =>
newIter.asInstanceOf[Iterator[String]].foreach { str =>
writeUTF(str, dataOut)
}
newIter.asInstanceOf[Iterator[String]].foreach(writeS)
case stream: PortableDataStream =>
newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
val bytes = stream.toArray()
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
write(stream.toArray())
}
case (key: String, stream: PortableDataStream) =>
newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
case (key, stream) =>
writeUTF(key, dataOut)
val bytes = stream.toArray()
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
writeS(key)
write(stream.toArray())
}
case (key: String, value: String) =>
newIter.asInstanceOf[Iterator[(String, String)]].foreach {
case (key, value) =>
writeUTF(key, dataOut)
writeUTF(value, dataOut)
writeS(key)
writeS(value)
}
case (key: Array[Byte], value: Array[Byte]) =>
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
case (key, value) =>
dataOut.writeInt(key.length)
dataOut.write(key)
dataOut.writeInt(value.length)
dataOut.write(value)
write(key)
write(value)
}
// key is null
case (null, v:Array[Byte]) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor style nit: I think there should be a space after v:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Also, for consistency with other "cases", v --> value.

newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
case (key, value) =>
write(key)
write(value)
}

case other =>
throw new SparkException("Unexpected element type " + first.getClass)
throw new SparkException("Unexpected element type " + other.getClass)
}
}
}
Expand Down
56 changes: 56 additions & 0 deletions examples/src/main/python/streaming/kafka_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: network_wordcount.py <zk> <topic>

To run this on your local machine, you need to setup Kafka and create a producer first
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I will remove these, and put a link here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the above commands want to run from Kafka bin/ ?
But it still create some confusion which directory we want use. In all the Spark examples bin/ refer to the bin/ of Spark.

and then run the example
`$ bin/spark-submit --driver-class-path lib_managed/jars/kafka_*.jar:\
external/kafka/target/scala-*/spark-streaming-kafka_*.jar examples/src/main/python/\
streaming/kafka_wordcount.py localhost:2181 test`
"""

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: network_wordcount.py <zk> <topic>"
exit(-1)

sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)

zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
7 changes: 6 additions & 1 deletion python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class SpecialLengths(object):
PYTHON_EXCEPTION_THROWN = -2
TIMING_DATA = -3
END_OF_STREAM = -4
NULL = -5


class Serializer(object):
Expand Down Expand Up @@ -145,8 +146,10 @@ def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError
if length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if obj == "":
if len(obj) < length:
raise EOFError
return self.loads(obj)

Expand Down Expand Up @@ -480,6 +483,8 @@ def loads(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError
if length == SpecialLengths.NULL:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if could be an elif, I guess.

return None
s = stream.read(length)
return s.decode("utf-8") if self.use_unicode else s

Expand Down
83 changes: 83 additions & 0 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from py4j.java_collections import MapConverter
from py4j.java_gateway import java_import, Py4JError

from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer
from pyspark.streaming import DStream

__all__ = ['KafkaUtils', 'utf8_decoder']


def utf8_decoder(s):
""" Decode the unicode as UTF-8 """
return s and s.decode('utf-8')


class KafkaUtils(object):

@staticmethod
def createStream(ssc, zkQuorum, groupId, topics,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a interface where arbitrary kafka parameters can be added.

storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
Create an input stream that pulls messages from a Kafka Broker.

:param ssc: StreamingContext object
:param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
:param groupId: The group id for this consumer.
:param topics: Dict of (topic_name -> numPartitions) to consume.
Each partition is consumed in its own thread.
:param storageLevel: RDD storage level.
:param keyDecoder: A function used to decode key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt we mention what the default decoder is?

:param valueDecoder: A function used to decode value
:return: A DStream object
"""
java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils")

param = {
"zookeeper.connect": zkQuorum,
"group.id": groupId,
"zookeeper.connection.timeout.ms": "10000",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this automatically added?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore this. This is also present in the scala code, so let it be here.

}
if not isinstance(topics, dict):
raise TypeError("topics should be dict")
jtopics = MapConverter().convert(topics, ssc.sparkContext._gateway._gateway_client)
jparam = MapConverter().convert(param, ssc.sparkContext._gateway._gateway_client)
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)

def getClassByName(name):
return ssc._jvm.org.apache.spark.util.Utils.classForName(name)

try:
array = getClassByName("[B")
decoder = getClassByName("kafka.serializer.DefaultDecoder")
jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
jparam, jtopics, jlevel)
except Py4JError, e:
# TODO: use --jar once it also work on driver
if not e.message or 'call a package' in e.message:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is clever; the 'call a package' errors are really confusing to users, so this message is pretty helpful.

print "No kafka package, please build it and add it into classpath:"
print " $ sbt/sbt streaming-kafka/package"
print " $ bin/submit --driver-class-path lib_managed/jars/kafka_2.10-0.8.0.jar:" \
"external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.3.0-SNAPSHOT.jar"
raise e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message that gets printed here is quite scary.


2015-02-02 18:31:31.950 java[76691:5f03] Unable to load realm info from SCDynamicStore
No kafka package, please put the assembly jar into classpath:
 $ bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar
Traceback (most recent call last):
  File "/Users/tdas/Projects/Spark/spark/examples/src/main/python/streaming/kafka_wordcount.py", line 46, in <module>
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
  File "/Users/tdas/Projects/Spark/spark/python/pyspark/streaming/kafka.py", line 80, in createStream
    raise e
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.util.Utils.classForName.
: java.lang.ClassNotFoundException: kafka.serializer.DefaultDecoder
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:270)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:153)
    at org.apache.spark.util.Utils.classForName(Utils.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:744)

Its easy to miss the real message. Is it possible to quit in a such that the whole stack trace does not get printed. Rather it gracefully quits after printing this message? Perhaps an exit? @JoshRosen Is that good idea.

ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)
return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))