Skip to content

Commit

Permalink
Merge pull request #3 from mengxr/SPARK-6267
Browse files Browse the repository at this point in the history
use Vector to have the best Python 2&3 compatibility
  • Loading branch information
yanboliang committed May 6, 2015
2 parents 4bccfee + 7f202f9 commit f20541d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,12 @@ private[python] class PythonMLLibAPI extends Serializable {
data: JavaRDD[Vector],
isotonic: Boolean): JList[Object] = {
val isotonicRegressionAlg = new IsotonicRegression().setIsotonic(isotonic)
val input = data.rdd.map { x =>
(x(0), x(1), x(2))
}.persist(StorageLevel.MEMORY_AND_DISK)
try {
val model = isotonicRegressionAlg.run(data.rdd.map(_.toArray).map {
x => (x(0), x(1), x(2)) }.persist(StorageLevel.MEMORY_AND_DISK))
List(model.boundaries, model.predictions).map(_.asInstanceOf[Object]).asJava
val model = isotonicRegressionAlg.run(input)
List[AnyRef](model.boundaryVector, model.predictionVector).asJava
} finally {
data.rdd.unpersist(blocking = false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.SQLContext

/**
* :: Experimental ::
Expand Down Expand Up @@ -140,6 +141,12 @@ class IsotonicRegressionModel (
}
}

/** A convenient method for boundaries called by the Python API. */
private[mllib] def boundaryVector: Vector = Vectors.dense(boundaries)

/** A convenient method for boundaries called by the Python API. */
private[mllib] def predictionVector: Vector = Vectors.dense(predictions)

override def save(sc: SparkContext, path: String): Unit = {
IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, boundaries, predictions, isotonic)
}
Expand Down
11 changes: 5 additions & 6 deletions python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,25 +445,24 @@ def save(self, sc, path):
def load(cls, sc, path):
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load(
sc._jsc.sc(), path)
py_boundaries = _java2py(sc, java_model.boundaries())
py_predictions = _java2py(sc, java_model.predictions())
return IsotonicRegressionModel(np.array(py_boundaries),
np.array(py_predictions), java_model.isotonic)
py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray()
py_predictions = _java2py(sc, java_model.predictionVector()).toArray()
return IsotonicRegressionModel(py_boundaries, py_predictions, java_model.isotonic)


class IsotonicRegression(object):
"""
Run IsotonicRegression algorithm to obtain isotonic regression model.
:param data: RDD of data points
:param data: RDD of (label, feature, weight) tuples.
:param isotonic: Whether this is isotonic or antitonic.
"""
@classmethod
def train(cls, data, isotonic=True):
"""Train a isotonic regression model on the given data."""
boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel",
data.map(_convert_to_vector), bool(isotonic))
return IsotonicRegressionModel(np.array(boundaries), np.array(predictions), isotonic)
return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)


def _test():
Expand Down

0 comments on commit f20541d

Please sign in to comment.