Skip to content

Commit

Permalink
Merge pull request apache#380 from mateiz/py-bayes
Browse files Browse the repository at this point in the history
Add Naive Bayes to Python MLlib, and some API fixes

- Added a Python wrapper for Naive Bayes
- Updated the Scala Naive Bayes to match the style of our other
  algorithms better and in particular make it easier to call from Java
  (added builder pattern, removed default value in train method)
- Updated Python MLlib functions to not require a SparkContext; we can
  get that from the RDD the user gives
- Added a toString method in LabeledPoint
- Made the Python MLlib tests run as part of run-tests as well (before
  they could only be run individually through each file)
  • Loading branch information
pwendell committed Jan 14, 2014
2 parents 4a805af + cc93c2a commit fdaabdc
Show file tree
Hide file tree
Showing 20 changed files with 297 additions and 58 deletions.
2 changes: 1 addition & 1 deletion docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ markdown: kramdown
# of Spark, Scala, and Mesos.
SPARK_VERSION: 0.9.0-incubating-SNAPSHOT
SPARK_VERSION_SHORT: 0.9.0
SCALA_VERSION: 2.10
SCALA_VERSION: "2.10"
MESOS_VERSION: 0.13.0
SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net
19 changes: 14 additions & 5 deletions docs/mllib-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ depends on native Fortran routines. You may need to install the
if it is not already present on your nodes. MLlib will throw a linking error if it cannot
detect these libraries automatically.

To use MLlib in Python, you will also need [NumPy](http://www.numpy.org) version 1.7 or newer.

# Binary Classification

Binary classification is a supervised learning problem in which we want to
Expand Down Expand Up @@ -316,6 +318,13 @@ other signals), you can use the trainImplicit method to get better results.
val model = ALS.trainImplicit(ratings, 1, 20, 0.01)
{% endhighlight %}

# Using MLLib in Java

All of MLlib's methods use Java-friendly types, so you can import and call them there the same
way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the
Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by
calling `.rdd()` on your `JavaRDD` object.

# Using MLLib in Python
Following examples can be tested in the PySpark shell.

Expand All @@ -330,7 +339,7 @@ from numpy import array
# Load and parse the data
data = sc.textFile("mllib/data/sample_svm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
model = LogisticRegressionWithSGD.train(sc, parsedData)
model = LogisticRegressionWithSGD.train(parsedData)

# Build the model
labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)),
Expand All @@ -356,7 +365,7 @@ data = sc.textFile("mllib/data/ridge-data/lpsa.data")
parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')]))

# Build the model
model = LinearRegressionWithSGD.train(sc, parsedData)
model = LinearRegressionWithSGD.train(parsedData)

# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda point: (point.item(0),
Expand All @@ -382,7 +391,7 @@ data = sc.textFile("kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(sc, parsedData, 2, maxIterations=10,
clusters = KMeans.train(parsedData, 2, maxIterations=10,
runs=30, initialization_mode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
Expand Down Expand Up @@ -411,7 +420,7 @@ data = sc.textFile("mllib/data/als/test.data")
ratings = data.map(lambda line: array([float(x) for x in line.split(',')]))

# Build the recommendation model using Alternating Least Squares
model = ALS.train(sc, ratings, 1, 20)
model = ALS.train(ratings, 1, 20)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (int(p[0]), int(p[1])))
Expand All @@ -426,5 +435,5 @@ signals), you can use the trainImplicit method to get better results.

{% highlight python %}
# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(sc, ratings, 1, 20)
model = ALS.trainImplicit(ratings, 1, 20)
{% endhighlight %}
8 changes: 7 additions & 1 deletion docs/python-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ In addition, PySpark fully supports interactive use---simply run `./bin/pyspark`

# Installing and Configuring PySpark

PySpark requires Python 2.6 or higher.
PySpark requires Python 2.7 or higher.
PySpark applications are executed using a standard CPython interpreter in order to support Python modules that use C extensions.
We have not tested PySpark with Python 3 or with alternative Python interpreters, such as [PyPy](http://pypy.org/) or [Jython](http://www.jython.org/).

Expand Down Expand Up @@ -149,6 +149,12 @@ sc = SparkContext(conf = conf)
[API documentation](api/pyspark/index.html) for PySpark is available as Epydoc.
Many of the methods also contain [doctests](http://docs.python.org/2/library/doctest.html) that provide additional usage examples.

# Libraries

[MLlib](mllib-guide.html) is also available in PySpark. To use it, you'll need
[NumPy](http://www.numpy.org) version 1.7 or newer. The [MLlib guide](mllib-guide.html) contains
some example applications.

# Where to Go from Here

PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/incubator-spark/tree/master/python/examples).
Expand Down
6 changes: 6 additions & 0 deletions mllib/data/sample_naive_bayes_data.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
0, 1 0 0
0, 2 0 0
1, 0 1 0
1, 0 2 0
2, 0 0 1
2, 0 0 2
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,23 @@ class PythonMLLibAPI extends Serializable {
dataBytesJRDD, initialWeightsBA)
}

/**
* Java stub for NaiveBayes.train()
*/
def trainNaiveBayes(dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double)
: java.util.List[java.lang.Object] =
{
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
LabeledPoint(x(0), x.slice(1, x.length))
})
val model = NaiveBayes.train(data, lambda)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleVector(model.pi))
ret.add(serializeDoubleMatrix(model.theta))
ret
}

/**
* Java stub for Python mllib KMeans.train()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object LogisticRegressionWithSGD {
* @param numIterations Number of iterations of gradient descent to run.
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param miniBatchFraction Fraction of data to be used per iteration.
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
Expand Down Expand Up @@ -183,6 +183,8 @@ object LogisticRegressionWithSGD {
val sc = new SparkContext(args(0), "LogisticRegression")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
println("Weights: " + model.weights.mkString("[", ", ", "]"))
println("Intercept: " + model.intercept)

sc.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ import scala.collection.mutable

import org.jblas.DoubleMatrix

import org.apache.spark.Logging
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.util.MLUtils

/**
* Model for Naive Bayes Classifiers.
*
* @param pi Log of class priors, whose dimension is C.
* @param theta Log of class conditional probabilities, whose dimension is CXD.
* @param theta Log of class conditional probabilities, whose dimension is CxD.
*/
class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]])
class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
extends ClassificationModel with Serializable {

// Create a column vector that can be used for predictions
Expand All @@ -50,10 +51,21 @@ class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]])
/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
*
* @param lambda The smooth parameter
* This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
*/
class NaiveBayes private (val lambda: Double = 1.0)
extends Serializable with Logging {
class NaiveBayes private (var lambda: Double)
extends Serializable with Logging
{
def this() = this(1.0)

/** Set the smoothing parameter. Default: 1.0. */
def setLambda(lambda: Double): NaiveBayes = {
this.lambda = lambda
this
}

/**
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
Expand Down Expand Up @@ -106,14 +118,49 @@ object NaiveBayes {
*
* This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
* document classification. By making every vector a 0-1 vector. it can also be used as
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
*
* This version of the method uses a default smoothing parameter of 1.0.
*
* @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
* vector or a count vector.
*/
def train(input: RDD[LabeledPoint]): NaiveBayesModel = {
new NaiveBayes().run(input)
}

/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
*
* This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
*
* @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
* vector or a count vector.
* @param lambda The smooth parameter
* @param lambda The smoothing parameter
*/
def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = {
def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = {
new NaiveBayes(lambda).run(input)
}

def main(args: Array[String]) {
if (args.length != 2 && args.length != 3) {
println("Usage: NaiveBayes <master> <input_dir> [<lambda>]")
System.exit(1)
}
val sc = new SparkContext(args(0), "NaiveBayes")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = if (args.length == 2) {
NaiveBayes.train(data)
} else {
NaiveBayes.train(data, args(2).toDouble)
}
println("Pi: " + model.pi.mkString("[", ", ", "]"))
println("Theta:\n" + model.theta.map(_.mkString("[", ", ", "]")).mkString("[", "\n ", "]"))

sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ object SVMWithSGD {
val sc = new SparkContext(args(0), "SVM")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
println("Weights: " + model.weights.mkString("[", ", ", "]"))
println("Intercept: " + model.intercept)

sc.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ package org.apache.spark.mllib.regression
* @param label Label for this data point.
* @param features List of features for this data point.
*/
case class LabeledPoint(val label: Double, val features: Array[Double])
case class LabeledPoint(label: Double, features: Array[Double]) {
override def toString: String = {
"LabeledPoint(%s, %s)".format(label, features.mkString("[", ", ", "]"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object LassoWithSGD {
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param regParam Regularization parameter.
* @param miniBatchFraction Fraction of data to be used per iteration.
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
Expand Down Expand Up @@ -205,6 +205,8 @@ object LassoWithSGD {
val sc = new SparkContext(args(0), "Lasso")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
println("Weights: " + model.weights.mkString("[", ", ", "]"))
println("Intercept: " + model.intercept)

sc.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ object LinearRegressionWithSGD {
val sc = new SparkContext(args(0), "LinearRegression")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
println("Weights: " + model.weights.mkString("[", ", ", "]"))
println("Intercept: " + model.intercept)

sc.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ object RidgeRegressionWithSGD {
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param regParam Regularization parameter.
* @param miniBatchFraction Fraction of data to be used per iteration.
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
Expand Down Expand Up @@ -208,6 +208,8 @@ object RidgeRegressionWithSGD {
val data = MLUtils.loadLabeledData(sc, args(1))
val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble,
args(3).toDouble)
println("Weights: " + model.weights.mkString("[", ", ", "]"))
println("Intercept: " + model.intercept)

sc.stop()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.apache.spark.mllib.classification;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;

public class JavaNaiveBayesSuite implements Serializable {
private transient JavaSparkContext sc;

@Before
public void setUp() {
sc = new JavaSparkContext("local", "JavaNaiveBayesSuite");
}

@After
public void tearDown() {
sc.stop();
sc = null;
System.clearProperty("spark.driver.port");
}

private static final List<LabeledPoint> POINTS = Arrays.asList(
new LabeledPoint(0, new double[] {1.0, 0.0, 0.0}),
new LabeledPoint(0, new double[] {2.0, 0.0, 0.0}),
new LabeledPoint(1, new double[] {0.0, 1.0, 0.0}),
new LabeledPoint(1, new double[] {0.0, 2.0, 0.0}),
new LabeledPoint(2, new double[] {0.0, 0.0, 1.0}),
new LabeledPoint(2, new double[] {0.0, 0.0, 2.0})
);

private int validatePrediction(List<LabeledPoint> points, NaiveBayesModel model) {
int correct = 0;
for (LabeledPoint p: points) {
if (model.predict(p.features()) == p.label()) {
correct += 1;
}
}
return correct;
}

@Test
public void runUsingConstructor() {
JavaRDD<LabeledPoint> testRDD = sc.parallelize(POINTS, 2).cache();

NaiveBayes nb = new NaiveBayes().setLambda(1.0);
NaiveBayesModel model = nb.run(testRDD.rdd());

int numAccurate = validatePrediction(POINTS, model);
Assert.assertEquals(POINTS.size(), numAccurate);
}

@Test
public void runUsingStaticMethods() {
JavaRDD<LabeledPoint> testRDD = sc.parallelize(POINTS, 2).cache();

NaiveBayesModel model1 = NaiveBayes.train(testRDD.rdd());
int numAccurate1 = validatePrediction(POINTS, model1);
Assert.assertEquals(POINTS.size(), numAccurate1);

NaiveBayesModel model2 = NaiveBayes.train(testRDD.rdd(), 0.5);
int numAccurate2 = validatePrediction(POINTS, model2);
Assert.assertEquals(POINTS.size(), numAccurate2);
}
}
2 changes: 1 addition & 1 deletion python/pyspark/mllib/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

from numpy import ndarray, copyto, float64, int64, int32, ones, array_equal, array, dot, shape
from pyspark import SparkContext
from pyspark import SparkContext, RDD

from pyspark.serializers import Serializer
import struct
Expand Down
Loading

0 comments on commit fdaabdc

Please sign in to comment.