diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml
new file mode 100644
index 0000000000000..01d0c2faed93a
--- /dev/null
+++ b/external/mqtt-assembly/pom.xml
@@ -0,0 +1,105 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.spark
+ spark-parent
+ 1.3.0-SNAPSHOT
+ ../../pom.xml
+
+
+ org.apache.spark
+ spark-streaming-mqtt-assembly_2.10
+ jar
+ Spark Project External Kafka Assembly
+ http://spark.apache.org/
+
+
+ streaming-mqtt-assembly
+ scala-${scala.binary.version}
+ spark-streaming-mqtt-assembly-${project.version}.jar
+ ${project.build.directory}/${spark.jar.dir}/${spark.jar.basename}
+
+
+
+
+ org.apache.spark
+ spark-streaming-mqtt_${scala.binary.version}
+ ${project.version}
+
+
+ org.apache.spark
+ spark-streaming_${scala.binary.version}
+ ${project.version}
+ provided
+
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+ false
+ ${spark.jar}
+
+
+ *:*
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+ reference.conf
+
+
+ log4j.properties
+
+
+
+
+
+
+
+
+
+
+
diff --git a/pom.xml b/pom.xml
index ffa96128a3d61..df06632997029 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,7 @@
external/flume-sink
external/flume-assembly
external/mqtt
+ external/mqtt-assembly
external/zeromq
examples
repl
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 3408c6d51ed4c..7555aa5e3861e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -45,8 +45,8 @@ object BuildCommons {
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
"kinesis-asl").map(ProjectRef(buildLocation, _))
- val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly) =
- Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly")
+ val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingMqttAssembly) =
+ Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-mqtt-assembly")
.map(ProjectRef(buildLocation, _))
val tools = ProjectRef(buildLocation, "tools")
@@ -347,7 +347,7 @@ object Assembly {
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
},
jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
- if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly")) {
+ if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly") || mName.contains("streaming-mqtt-assembly")) {
// This must match the same name used in maven (see external/kafka-assembly/pom.xml)
s"${mName}-${v}.jar"
} else {
diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py
new file mode 100644
index 0000000000000..423869bffdf74
--- /dev/null
+++ b/python/pyspark/streaming/mqtt.py
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from py4j.java_collections import MapConverter
+from py4j.java_gateway import java_import, Py4JError
+
+from pyspark.storagelevel import StorageLevel
+from pyspark.serializers import PairDeserializer, NoOpSerializer
+from pyspark.streaming import DStream
+
+__all__ = ['MQTTUtils']
+
+
+class MQTTUtils(object):
+
+ @staticmethod
+ def createStream(ssc, brokerUrl, topic
+ storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
+ """
+ Create an input stream that pulls messages from a Mqtt Broker.
+ :param ssc: StreamingContext object
+ :param brokerUrl: Url of remote mqtt publisher
+ :param topic: topic name to subscribe to
+ :param storageLevel: RDD storage level.
+ :return: A DStream object
+ """
+ java_import(ssc._jvm, "org.apache.spark.streaming.mqtt.MQTTUtils")
+
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+
+ try:
+ jstream = ssc._jvm.MQTTUtils.createStream(ssc._jssc, brokerUrl, topic, jlevel)
+
+ except Py4JError, e:
+ # TODO: use --jar once it also work on driver
+ if not e.message or 'call a package' in e.message:
+ print "No Mqtt package, please put the assembly jar into classpath:"
+ print " $ bin/spark-submit --driver-class-path external/mqtt-assembly/target/" + \
+ "scala-*/spark-streaming-mqtt-assembly-*.jar"
+ raise e
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ stream = DStream(jstream, ssc, ser)
+ return stream