-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python #3715
Conversation
Test build #24510 has started for PR 3715 at commit
|
Test build #24510 has finished for PR 3715 at commit
|
Test FAILed. |
Test build #24511 has started for PR 3715 at commit
|
Test build #24511 has finished for PR 3715 at commit
|
Test PASSed. |
LGTM |
Test build #24609 has started for PR 3715 at commit
|
Test build #24609 has finished for PR 3715 at commit
|
Test FAILed. |
Test build #24610 has started for PR 3715 at commit
|
Test build #24610 has finished for PR 3715 at commit
|
Test FAILed. |
Test build #25267 has started for PR 3715 at commit
|
Test build #25267 has finished for PR 3715 at commit
|
Test FAILed. |
dataOut.write(bytes) | ||
} | ||
} | ||
def writeS(str: String) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new line missing.
Kafka-assembly for Python API
Test build #25951 has started for PR 3715 at commit
|
Conflicts: make-distribution.sh project/SparkBuild.scala
Test build #26331 has started for PR 3715 at commit
|
Test build #26331 has finished for PR 3715 at commit
|
Test PASSed. |
Conflicts: python/pyspark/tests.py
Test build #26339 has started for PR 3715 at commit
|
Test build #26339 has finished for PR 3715 at commit
|
Test PASSed. |
def write(obj: Any): Unit = obj match { | ||
case null => | ||
dataOut.writeInt(SpecialLengths.NULL) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the extra line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
group them into different categories.
Test build #26361 has started for PR 3715 at commit
|
Test build #26361 has finished for PR 3715 at commit
|
Test PASSed. |
Test build #26578 has started for PR 3715 at commit
|
print "No kafka package, please put the assembly jar into classpath:" | ||
print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \ | ||
"scala-*/spark-streaming-kafka-assembly-*.jar" | ||
raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The message that gets printed here is quite scary.
2015-02-02 18:31:31.950 java[76691:5f03] Unable to load realm info from SCDynamicStore
No kafka package, please put the assembly jar into classpath:
$ bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar
Traceback (most recent call last):
File "/Users/tdas/Projects/Spark/spark/examples/src/main/python/streaming/kafka_wordcount.py", line 46, in <module>
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
File "/Users/tdas/Projects/Spark/spark/python/pyspark/streaming/kafka.py", line 80, in createStream
raise e
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.util.Utils.classForName.
: java.lang.ClassNotFoundException: kafka.serializer.DefaultDecoder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.util.Utils$.classForName(Utils.scala:153)
at org.apache.spark.util.Utils.classForName(Utils.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:744)
Its easy to miss the real message. Is it possible to quit in a such that the whole stack trace does not get printed. Rather it gracefully quits after printing this message? Perhaps an exit? @JoshRosen Is that good idea.
I am merging despite a small comment from me. Thanks @davies and others for helping! |
Test build #26578 has finished for PR 3715 at commit
|
Test PASSed. |
This PR brings the Python API for Spark Streaming Kafka data source.
run the example: