Skip to content

Commit

Permalink
SPARK-1462: Examples of ML algorithms are using deprecated APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
techaddict committed Apr 16, 2014
1 parent df36091 commit 6c7e543
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
package org.apache.spark.examples

import java.util.Random
import org.apache.spark.util.Vector
import breeze.linalg.{Vector, DenseVector}

object LocalFileLR {
val D = 10 // Numer of dimensions
val rand = new Random(42)

case class DataPoint(x: Vector, y: Double)
case class DataPoint(x: Vector[Double], y: Double)

def parsePoint(line: String): DataPoint = {
val nums = line.split(' ').map(_.toDouble)
DataPoint(new Vector(nums.slice(1, D + 1)), nums(0))
DataPoint(new DenseVector(nums.slice(1, D + 1)), nums(0))
}

def main(args: Array[String]) {
Expand All @@ -37,15 +37,15 @@ object LocalFileLR {
val ITERATIONS = args(1).toInt

// Initialize w to a random value
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
var gradient = Vector.zeros(D)
var gradient = DenseVector.zeros[Double](D)
for (p <- points) {
val scale = (1 / (1 + math.exp(-p.y * (w dot p.x))) - 1) * p.y
gradient += scale * p.x
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
gradient += p.x * scale
}
w -= gradient
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.spark.examples

import java.util.Random
import org.apache.spark.util.Vector
import org.apache.spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet

import breeze.linalg.{Vector, DenseVector}
import breeze.linalg.squaredDistance

/**
* K-means clustering.
*/
Expand All @@ -36,19 +38,19 @@ object LocalKMeans {

def generateData = {
def generatePoint(i: Int) = {
Vector(D, _ => rand.nextDouble * R)
DenseVector.fill(D){rand.nextDouble * R}
}
Array.tabulate(N)(generatePoint)
}

def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
def closestPoint(p: Vector[Double], centers: HashMap[Int, Vector[Double]]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity

for (i <- 1 to centers.size) {
val vCurr = centers.get(i).get
val tempDist = p.squaredDist(vCurr)
val tempDist = squaredDistance(p, vCurr)
if (tempDist < closest) {
closest = tempDist
bestIndex = i
Expand All @@ -60,8 +62,8 @@ object LocalKMeans {

def main(args: Array[String]) {
val data = generateData
var points = new HashSet[Vector]
var kPoints = new HashMap[Int, Vector]
var points = new HashSet[Vector[Double]]
var kPoints = new HashMap[Int, Vector[Double]]
var tempDist = 1.0

while (points.size < K) {
Expand All @@ -81,16 +83,16 @@ object LocalKMeans {
var mappings = closest.groupBy[Int] (x => x._1)

var pointStats = mappings.map { pair =>
pair._2.reduceLeft [(Int, (Vector, Int))] {
pair._2.reduceLeft [(Int, (Vector[Double], Int))] {
case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1 + y2))
}
}

var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1 * (1.0 / mapping._2._2))}

tempDist = 0.0
for (mapping <- newPoints) {
tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
tempDist += squaredDistance(kPoints.get(mapping._1).get, mapping._2)
}

for (newP <- newPoints) {
Expand Down
14 changes: 7 additions & 7 deletions examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.examples

import java.util.Random
import org.apache.spark.util.Vector
import breeze.linalg.{Vector, DenseVector}

/**
* Logistic regression based classification.
Expand All @@ -30,12 +30,12 @@ object LocalLR {
val ITERATIONS = 5
val rand = new Random(42)

case class DataPoint(x: Vector, y: Double)
case class DataPoint(x: Vector[Double], y: Double)

def generateData = {
def generatePoint(i: Int) = {
val y = if(i % 2 == 0) -1 else 1
val x = Vector(D, _ => rand.nextGaussian + y * R)
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
}
Array.tabulate(N)(generatePoint)
Expand All @@ -45,15 +45,15 @@ object LocalLR {
val data = generateData

// Initialize w to a random value
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
var gradient = Vector.zeros(D)
var gradient = DenseVector.zeros[Double](D)
for (p <- data) {
val scale = (1 / (1 + math.exp(-p.y * (w dot p.x))) - 1) * p.y
gradient += scale * p.x
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
gradient += p.x * scale
}
w -= gradient
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@ package org.apache.spark.examples

import java.util.Random
import scala.math.exp
import org.apache.spark.util.Vector
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.InputFormatInfo

import breeze.linalg.{Vector, DenseVector}

/**
* Logistic regression based classification.
*/
object SparkHdfsLR {
val D = 10 // Numer of dimensions
val rand = new Random(42)

case class DataPoint(x: Vector, y: Double)
case class DataPoint(x: Vector[Double], y: Double)

def parsePoint(line: String): DataPoint = {
val tok = new java.util.StringTokenizer(line, " ")
Expand All @@ -41,7 +42,7 @@ object SparkHdfsLR {
while (i < D) {
x(i) = tok.nextToken.toDouble; i += 1
}
DataPoint(new Vector(x), y)
DataPoint(new DenseVector(x), y)
}

def main(args: Array[String]) {
Expand All @@ -61,13 +62,13 @@ object SparkHdfsLR {
val ITERATIONS = args(2).toInt

// Initialize w to a random value
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
val gradient = points.map { p =>
(1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,29 @@ package org.apache.spark.examples

import java.util.Random
import org.apache.spark.SparkContext
import org.apache.spark.util.Vector
import org.apache.spark.SparkContext._

import breeze.linalg.{Vector, DenseVector}
import breeze.linalg.squaredDistance

/**
* K-means clustering.
*/
object SparkKMeans {
val R = 1000 // Scaling factor
val rand = new Random(42)

def parseVector(line: String): Vector = {
new Vector(line.split(' ').map(_.toDouble))
def parseVector(line: String): Vector[Double] = {
DenseVector(line.split(' ').map(_.toDouble))
}

def closestPoint(p: Vector, centers: Array[Vector]): Int = {
def closestPoint(p: Vector[Double], centers: Array[Vector[Double]]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity

for (i <- 0 until centers.length) {
val tempDist = p.squaredDist(centers(i))
val tempDist = squaredDistance(p, centers(i))
if (tempDist < closest) {
closest = tempDist
bestIndex = i
Expand Down Expand Up @@ -69,11 +71,11 @@ object SparkKMeans {

val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}

val newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
val newPoints = pointStats.map {pair => (pair._1, pair._2._1 * (1.0 / pair._2._2))}.collectAsMap()

tempDist = 0.0
for (i <- 0 until K) {
tempDist += kPoints(i).squaredDist(newPoints(i))
tempDist += squaredDistance(kPoints(i), newPoints(i))
}

for (newP <- newPoints) {
Expand Down
11 changes: 6 additions & 5 deletions examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.spark.examples

import java.util.Random
import scala.math.exp
import org.apache.spark.util.Vector
import org.apache.spark._

import breeze.linalg.{Vector, DenseVector}

/**
* Logistic regression based classification.
*/
Expand All @@ -32,12 +33,12 @@ object SparkLR {
val ITERATIONS = 5
val rand = new Random(42)

case class DataPoint(x: Vector, y: Double)
case class DataPoint(x: Vector[Double], y: Double)

def generateData = {
def generatePoint(i: Int) = {
val y = if(i % 2 == 0) -1 else 1
val x = Vector(D, _ => rand.nextGaussian + y * R)
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
DataPoint(x, y)
}
Array.tabulate(N)(generatePoint)
Expand All @@ -54,13 +55,13 @@ object SparkLR {
val points = sc.parallelize(generateData, numSlices).cache()

// Initialize w to a random value
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
val gradient = points.map { p =>
(1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package org.apache.spark.examples

import java.util.Random
import scala.math.exp
import org.apache.spark.util.Vector
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.InputFormatInfo
import org.apache.spark.storage.StorageLevel

import breeze.linalg.{Vector, DenseVector}

/**
* Logistic regression based classification.
* This example uses Tachyon to persist rdds during computation.
Expand All @@ -33,7 +34,7 @@ object SparkTachyonHdfsLR {
val D = 10 // Numer of dimensions
val rand = new Random(42)

case class DataPoint(x: Vector, y: Double)
case class DataPoint(x: Vector[Double], y: Double)

def parsePoint(line: String): DataPoint = {
val tok = new java.util.StringTokenizer(line, " ")
Expand All @@ -43,7 +44,7 @@ object SparkTachyonHdfsLR {
while (i < D) {
x(i) = tok.nextToken.toDouble; i += 1
}
DataPoint(new Vector(x), y)
DataPoint(new DenseVector(x), y)
}

def main(args: Array[String]) {
Expand All @@ -63,13 +64,13 @@ object SparkTachyonHdfsLR {
val ITERATIONS = args(2).toInt

// Initialize w to a random value
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
val gradient = points.map { p =>
(1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}
Expand Down

0 comments on commit 6c7e543

Please sign in to comment.