Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-github/branch-0.9' into branch-0…
Browse files Browse the repository at this point in the history
….9-streaming-docs
  • Loading branch information
tdas committed Mar 20, 2014
2 parents d792351 + 1cc979e commit 98c3e98
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 195 deletions.
201 changes: 141 additions & 60 deletions mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.mllib.recommendation

import scala.collection.mutable.{ArrayBuffer, BitSet}
import scala.math.{abs, sqrt}
import scala.util.Random
import scala.util.Sorting

Expand Down Expand Up @@ -63,7 +64,7 @@ case class Rating(val user: Int, val product: Int, val rating: Double)
* Alternating Least Squares matrix factorization.
*
* ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices,
* `X` and `Y`, i.e. `Xt * Y = R`. Typically these approximations are called 'factor' matrices.
* `X` and `Y`, i.e. `X * Yt = R`. Typically these approximations are called 'factor' matrices.
* The general approach is iterative. During each iteration, one of the factor matrices is held
* constant, while the other is solved for using least squares. The newly-solved factor matrix is
* then held constant while solving for the other factor matrix.
Expand All @@ -80,17 +81,22 @@ case class Rating(val user: Int, val product: Int, val rating: Double)
*
* For implicit preference data, the algorithm used is based on
* "Collaborative Filtering for Implicit Feedback Datasets", available at
* [[http://research.yahoo.com/pub/2433]], adapted for the blocked approach used here.
* [[http://dx.doi.org/10.1109/ICDM.2008.22]], adapted for the blocked approach used here.
*
* Essentially instead of finding the low-rank approximations to the rating matrix `R`,
* this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if r > 0
* and 0 if r = 0. The ratings then act as 'confidence' values related to strength of indicated user
* preferences rather than explicit ratings given to items.
*/
class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double,
var implicitPrefs: Boolean, var alpha: Double)
extends Serializable with Logging
{
class ALS private (
var numBlocks: Int,
var rank: Int,
var iterations: Int,
var lambda: Double,
var implicitPrefs: Boolean,
var alpha: Double,
var seed: Long = System.nanoTime()
) extends Serializable with Logging {
def this() = this(-1, 10, 10, 0.01, false, 1.0)

/**
Expand Down Expand Up @@ -130,6 +136,12 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
this
}

/** Sets a random seed to have deterministic results. */
def setSeed(seed: Long): ALS = {
this.seed = seed
this
}

/**
* Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
Expand All @@ -151,9 +163,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock)

// Initialize user and product factors randomly, but use a deterministic seed for each partition
// so that fault recovery works
val seedGen = new Random()
// Initialize user and product factors randomly, but use a deterministic seed for each
// partition so that fault recovery works
val seedGen = new Random(seed)
val seed1 = seedGen.nextInt()
val seed2 = seedGen.nextInt()
// Hash an integer to propagate random bits at all positions, similar to java.util.HashTable
Expand Down Expand Up @@ -208,21 +220,46 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
*/
def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
if (implicitPrefs) {
Option(
factors.flatMapValues { case factorArray =>
factorArray.view.map { vector =>
val x = new DoubleMatrix(vector)
x.mmul(x.transpose())
}
}.reduceByKeyLocally((a, b) => a.addi(b))
.values
.reduce((a, b) => a.addi(b))
)
val n = rank * (rank + 1) / 2
val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => {
Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L))
L
}, combOp = (L1, L2) => {
L1.addi(L2)
})
val YtY = new DoubleMatrix(rank, rank)
fillFullMatrix(LYtY, YtY)
Option(YtY)
} else {
None
}
}

/**
* Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's DSPR.
*
* @param L the lower triangular part of the matrix packed in an array (row major)
*/
private def dspr(alpha: Double, x: DoubleMatrix, L: DoubleMatrix) = {
val n = x.length
var i = 0
var j = 0
var idx = 0
var axi = 0.0
val xd = x.data
val Ld = L.data
while (i < n) {
axi = alpha * xd(i)
j = 0
while (j <= i) {
Ld(idx) += axi * xd(j)
j += 1
idx += 1
}
i += 1
}
}

/**
* Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs
*/
Expand Down Expand Up @@ -301,7 +338,14 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
* Make a random factor vector with the given random.
*/
private def randomFactor(rank: Int, rand: Random): Array[Double] = {
Array.fill(rank)(rand.nextDouble)
// Choose a unit vector uniformly at random from the unit sphere, but from the
// "first quadrant" where all elements are nonnegative. This can be done by choosing
// elements distributed as Normal(0,1) and taking the absolute value, and then normalizing.
// This appears to create factorizations that have a slightly better reconstruction
// (<1%) compared picking elements uniformly at random in [0,1].
val factor = Array.fill(rank)(abs(rand.nextGaussian()))
val norm = sqrt(factor.map(x => x * x).sum)
factor.map(x => x / norm)
}

/**
Expand Down Expand Up @@ -365,51 +409,41 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
for (productBlock <- 0 until numBlocks) {
for (p <- 0 until blockFactors(productBlock).length) {
val x = new DoubleMatrix(blockFactors(productBlock)(p))
fillXtX(x, tempXtX)
tempXtX.fill(0.0)
dspr(1.0, x, tempXtX)
val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p)
for (i <- 0 until us.length) {
implicitPrefs match {
case false =>
userXtX(us(i)).addi(tempXtX)
SimpleBlas.axpy(rs(i), x, userXy(us(i)))
case true =>
userXtX(us(i)).addi(tempXtX.mul(alpha * rs(i)))
SimpleBlas.axpy(1 + alpha * rs(i), x, userXy(us(i)))
// Extension to the original paper to handle rs(i) < 0. confidence is a function
// of |rs(i)| instead so that it is never negative:
val confidence = 1 + alpha * abs(rs(i))
SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i)))
// For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i)
// means we try to reconstruct 0. We add terms only where P = 1, so, term below
// is now only added for rs(i) > 0:
if (rs(i) > 0) {
SimpleBlas.axpy(confidence, x, userXy(us(i)))
}
}
}
}
}

// Solve the least-squares problem for each user and return the new feature vectors
userXtX.zipWithIndex.map{ case (triangularXtX, index) =>
Array.range(0, numUsers).map { index =>
// Compute the full XtX matrix from the lower-triangular part we got above
fillFullMatrix(triangularXtX, fullXtX)
fillFullMatrix(userXtX(index), fullXtX)
// Add regularization
(0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda)
// Solve the resulting matrix, which is symmetric and positive-definite
implicitPrefs match {
case false => Solve.solvePositive(fullXtX, userXy(index)).data
case true => Solve.solvePositive(fullXtX.add(YtY.value.get), userXy(index)).data
}
}
}

/**
* Set xtxDest to the lower-triangular part of x transpose * x. For efficiency in summing
* these matrices, we store xtxDest as only rank * (rank+1) / 2 values, namely the values
* at (0,0), (1,0), (1,1), (2,0), (2,1), (2,2), etc in that order.
*/
private def fillXtX(x: DoubleMatrix, xtxDest: DoubleMatrix) {
var i = 0
var pos = 0
while (i < x.length) {
var j = 0
while (j <= i) {
xtxDest.data(pos) = x.data(i) * x.data(j)
pos += 1
j += 1
case true => Solve.solvePositive(fullXtX.addi(YtY.value.get), userXy(index)).data
}
i += 1
}
}

Expand All @@ -436,9 +470,10 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l


/**
* Top-level methods for calling Alternating Least Squares (ALS) matrix factorizaton.
* Top-level methods for calling Alternating Least Squares (ALS) matrix factorization.
*/
object ALS {

/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
Expand All @@ -451,15 +486,39 @@ object ALS {
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param seed random seed
*/
def train(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int)
: MatrixFactorizationModel =
{
blocks: Int,
seed: Long
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings)
}

/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
* product of two lower-rank matrices of a given rank (number of features). To solve for these
* features, we run a given number of iterations of ALS. This is done using a level of
* parallelism given by `blocks`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
*/
def train(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings)
}

Expand All @@ -476,8 +535,7 @@ object ALS {
* @param lambda regularization factor (recommended: 0.01)
*/
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double)
: MatrixFactorizationModel =
{
: MatrixFactorizationModel = {
train(ratings, rank, iterations, lambda, -1)
}

Expand All @@ -493,8 +551,7 @@ object ALS {
* @param iterations number of iterations of ALS (recommended: 10-20)
*/
def train(ratings: RDD[Rating], rank: Int, iterations: Int)
: MatrixFactorizationModel =
{
: MatrixFactorizationModel = {
train(ratings, rank, iterations, 0.01, -1)
}

Expand All @@ -511,16 +568,42 @@ object ALS {
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param alpha confidence parameter (only applies when immplicitPrefs = true)
* @param seed random seed
*/
def trainImplicit(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
alpha: Double)
: MatrixFactorizationModel =
{
alpha: Double,
seed: Long
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
}

/**
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users
* to some products, in the form of (userID, productID, preference) pairs. We approximate the
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
* To solve for these features, we run a given number of iterations of ALS. This is done using
* a level of parallelism given by `blocks`.
*
* @param ratings RDD of (userID, productID, rating) pairs
* @param rank number of features to use
* @param iterations number of iterations of ALS (recommended: 10-20)
* @param lambda regularization factor (recommended: 0.01)
* @param blocks level of parallelism to split computation into
* @param alpha confidence parameter (only applies when immplicitPrefs = true)
*/
def trainImplicit(
ratings: RDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
alpha: Double
): MatrixFactorizationModel = {
new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings)
}

Expand All @@ -537,8 +620,7 @@ object ALS {
* @param lambda regularization factor (recommended: 0.01)
*/
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double)
: MatrixFactorizationModel =
{
: MatrixFactorizationModel = {
trainImplicit(ratings, rank, iterations, lambda, -1, alpha)
}

Expand All @@ -555,8 +637,7 @@ object ALS {
* @param iterations number of iterations of ALS (recommended: 10-20)
*/
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int)
: MatrixFactorizationModel =
{
: MatrixFactorizationModel = {
trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
*/
def run(input: RDD[LabeledPoint]) : M = {
val nfeatures: Int = input.first().features.length
val initialWeights = Array.fill(nfeatures)(1.0)
val initialWeights = new Array[Double](nfeatures)
run(input, initialWeights)
}

Expand All @@ -134,15 +134,15 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
throw new SparkException("Input validation failed.")
}

// Add a extra variable consisting of all 1.0's for the intercept.
// Prepend an extra variable consisting of all 1.0's for the intercept.
val data = if (addIntercept) {
input.map(labeledPoint => (labeledPoint.label, Array(1.0, labeledPoint.features:_*)))
input.map(labeledPoint => (labeledPoint.label, labeledPoint.features.+:(1.0)))
} else {
input.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
}

val initialWeightsWithIntercept = if (addIntercept) {
Array(1.0, initialWeights:_*)
initialWeights.+:(1.0)
} else {
initialWeights
}
Expand Down
Loading

0 comments on commit 98c3e98

Please sign in to comment.