Skip to content

Commit

Permalink
[SPARK-5155] [PYSPARK] [STREAMING] Mqtt streaming support in Python
Browse files Browse the repository at this point in the history
This PR is based on apache#4229, thanks prabeesh.

Closes apache#4229

Author: Prabeesh K <prabsmails@gmail.com>
Author: zsxwing <zsxwing@gmail.com>
Author: prabs <prabsmails@gmail.com>
Author: Prabeesh K <prabeesh.k@namshi.com>

Closes apache#7833 from zsxwing/pr4229 and squashes the following commits:

9570bec [zsxwing] Fix the variable name and check null in finally
4a9c79e [zsxwing] Fix pom.xml indentation
abf5f18 [zsxwing] Merge branch 'master' into pr4229
935615c [zsxwing] Fix the flaky MQTT tests
47278c5 [zsxwing] Include the project class files
478f844 [zsxwing] Add unpack
5f8a1d4 [zsxwing] Make the maven build generate the test jar for Python MQTT tests
734db99 [zsxwing] Merge branch 'master' into pr4229
126608a [Prabeesh K] address the comments
b90b709 [Prabeesh K] Merge pull request #1 from zsxwing/pr4229
d07f454 [zsxwing] Register StreamingListerner before starting StreamingContext; Revert unncessary changes; fix the python unit test
a6747cb [Prabeesh K] wait for starting the receiver before publishing data
87fc677 [Prabeesh K] address the comments:
97244ec [zsxwing] Make sbt build the assembly test jar for streaming mqtt
80474d1 [Prabeesh K] fix
1f0cfe9 [Prabeesh K] python style fix
e1ee016 [Prabeesh K] scala style fix
a5a8f9f [Prabeesh K] added Python test
9767d82 [Prabeesh K] implemented Python-friendly class
a11968b [Prabeesh K] fixed python style
795ec27 [Prabeesh K] address comments
ee387ae [Prabeesh K] Fix assembly jar location of mqtt-assembly
3f4df12 [Prabeesh K] updated version
b34c3c1 [prabs] adress comments
3aa7fff [prabs] Added Python streaming mqtt word count example
b7d42ff [prabs] Mqtt streaming support in Python

(cherry picked from commit 853809e)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
  • Loading branch information
prabeesh authored and tdas committed Aug 10, 2015
1 parent 51406be commit 8f4014f
Show file tree
Hide file tree
Showing 14 changed files with 565 additions and 109 deletions.
2 changes: 2 additions & 0 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ def build_spark_sbt(hadoop_version):
"assembly/assembly",
"streaming-kafka-assembly/assembly",
"streaming-flume-assembly/assembly",
"streaming-mqtt-assembly/assembly",
"streaming-mqtt/test:assembly",
"streaming-kinesis-asl-assembly/assembly"]
profiles_and_goals = build_profiles + sbt_goals

Expand Down
2 changes: 2 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def contains_file(self, filename):
dependencies=[streaming],
source_file_regexes=[
"external/mqtt",
"external/mqtt-assembly",
],
sbt_test_goals=[
"streaming-mqtt/test",
Expand Down Expand Up @@ -306,6 +307,7 @@ def contains_file(self, filename):
streaming,
streaming_kafka,
streaming_flume_assembly,
streaming_mqtt,
streaming_kinesis_asl
],
source_file_regexes=[
Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
{:.no_toc}

<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
out of these sources, *only* Kafka and Flume are available in the Python API. We will add more advanced sources in the Python API in future.
out of these sources, *only* Kafka, Flume and MQTT are available in the Python API. We will add more advanced sources in the Python API in future.

This category of sources require interfacing with external non-Spark libraries, some of them with
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
Expand Down
58 changes: 58 additions & 0 deletions examples/src/main/python/streaming/mqtt_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A sample wordcount with MqttStream stream
Usage: mqtt_wordcount.py <broker url> <topic>
To run this in your local machine, you need to setup a MQTT broker and publisher first,
Mosquitto is one of the open source MQTT Brokers, see
http://mosquitto.org/
Eclipse paho project provides number of clients and utilities for working with MQTT, see
http://www.eclipse.org/paho/#getting-started
and then run the example
`$ bin/spark-submit --jars external/mqtt-assembly/target/scala-*/\
spark-streaming-mqtt-assembly-*.jar examples/src/main/python/streaming/mqtt_wordcount.py \
tcp://localhost:1883 foo`
"""

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: mqtt_wordcount.py <broker url> <topic>"
exit(-1)

sc = SparkContext(appName="PythonStreamingMQTTWordCount")
ssc = StreamingContext(sc, 1)

brokerUrl = sys.argv[1]
topic = sys.argv[2]

lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
102 changes: 102 additions & 0 deletions external/mqtt-assembly/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt-assembly_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project External MQTT Assembly</name>
<url>http://spark.apache.org/</url>

<properties>
<sbt.project.name>streaming-mqtt-assembly</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resource>log4j.properties</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
28 changes: 28 additions & 0 deletions external/mqtt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,33 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>

<plugins>
<!-- Assemble a jar with test dependencies for Python tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>test-jar-with-dependencies</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<!-- Make sure the file path is same as the sbt build -->
<finalName>spark-streaming-mqtt-test-${project.version}</finalName>
<outputDirectory>${project.build.directory}/scala-${scala.binary.version}/</outputDirectory>
<appendAssemblyId>false</appendAssemblyId>
<!-- Don't publish it since it's only for Python tests -->
<attach>false</attach>
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
44 changes: 44 additions & 0 deletions external/mqtt/src/main/assembly/assembly.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<assembly>
<id>test-jar-with-dependencies</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>

<fileSets>
<fileSet>
<directory>${project.build.directory}/scala-${scala.binary.version}/test-classes</directory>
<outputDirectory>/</outputDirectory>
</fileSet>
</fileSets>

<dependencySets>
<dependencySet>
<useTransitiveDependencies>true</useTransitiveDependencies>
<scope>test</scope>
<unpack>true</unpack>
<excludes>
<exclude>org.apache.hadoop:*:jar</exclude>
<exclude>org.apache.zookeeper:*:jar</exclude>
<exclude>org.apache.avro:*:jar</exclude>
</excludes>
</dependencySet>
</dependencySets>

</assembly>
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,19 @@ object MQTTUtils {
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
}

/**
* This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and
* function so that it can be easily instantiated and called from Python's MQTTUtils.
*/
private class MQTTUtilsPythonHelper {

def createStream(
jssc: JavaStreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
): JavaDStream[String] = {
MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
}
}
Loading

0 comments on commit 8f4014f

Please sign in to comment.