Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark-18408][ML] API Improvements for LSH #15874

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
559c099
[SPARK-18334] MinHash should use binary hash distance
Nov 7, 2016
517a97b
Remove misleading documentation as requested
Yunni Nov 8, 2016
b546dbd
Add warning for multi-probe in MinHash
Nov 8, 2016
a3cd928
Merge branch 'SPARK-18334-yunn-minhash-bug' of https://github.com/Yun…
Nov 8, 2016
c8243c7
(1) Fix documentation as CR suggested (2) Fix typo in unit test
Nov 9, 2016
6aac8b3
Fix typo in unit test
Nov 9, 2016
9870743
[SPARK-18408] API Improvements for LSH
Nov 14, 2016
0e9250b
(1) Fix description for numHashFunctions (2) Make numEntries in MinHa…
Nov 14, 2016
adbbefe
Add assertion for hashFunction in BucketedRandomProjectionLSHSuite
Nov 14, 2016
c115ed3
Revert AND-amplification for a future PR
Nov 14, 2016
033ae5d
Code Review Comments
Nov 15, 2016
c597f4c
Add unit tests to run on Jenkins.
Nov 16, 2016
d759875
Add unit tests to run on Jenkins.
Nov 16, 2016
596eb06
CR comments
Nov 17, 2016
00d08bf
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
Nov 17, 2016
3d0810f
Update comments
Nov 17, 2016
257ef19
Add scaladoc for approximately min-wise independence
Yunni Nov 18, 2016
2c264b7
Change documentation reference
Yunni Nov 18, 2016
36ca278
Removing modulo numEntries
Nov 19, 2016
4508393
Merge branch 'SPARK-18408-yunn-api-improvements' of https://github.co…
Nov 19, 2016
939e9d5
Code Review Comments
Nov 22, 2016
8b9403d
Minimize the test cases by directly using artificial models
Nov 22, 2016
f0ebcb7
Code review comments
Nov 22, 2016
e198080
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
Yunni Nov 28, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import org.apache.spark.sql.types.StructType
/**
* :: Experimental ::
*
* Params for [[RandomProjection]].
* Params for [[BucketedRandomProjectionLSH]].
*/
private[ml] trait RandomProjectionParams extends Params {
private[ml] trait BucketedRandomProjectionLSHParams extends Params {

/**
* The length of each hash bucket, a larger bucket lowers the false negative rate. The number of
Expand All @@ -58,8 +58,8 @@ private[ml] trait RandomProjectionParams extends Params {
/**
* :: Experimental ::
*
* Model produced by [[RandomProjection]], where multiple random vectors are stored. The vectors
* are normalized to be unit vectors and each vector is used in a hash function:
* Model produced by [[BucketedRandomProjectionLSH]], where multiple random vectors are stored. The
* vectors are normalized to be unit vectors and each vector is used in a hash function:
* `h_i(x) = floor(r_i.dot(x) / bucketLength)`
* where `r_i` is the i-th random unit vector. The number of buckets will be `(max L2 norm of input
* vectors) / bucketLength`.
Expand All @@ -68,18 +68,19 @@ private[ml] trait RandomProjectionParams extends Params {
*/
@Experimental
@Since("2.1.0")
class RandomProjectionModel private[ml] (
class BucketedRandomProjectionLSHModel private[ml](
override val uid: String,
@Since("2.1.0") val randUnitVectors: Array[Vector])
extends LSHModel[RandomProjectionModel] with RandomProjectionParams {
private[ml] val randUnitVectors: Array[Vector])
extends LSHModel[BucketedRandomProjectionLSHModel] with BucketedRandomProjectionLSHParams {

@Since("2.1.0")
override protected[ml] val hashFunction: (Vector) => Vector = {
override protected[ml] val hashFunction: Vector => Array[Vector] = {
key: Vector => {
val hashValues: Array[Double] = randUnitVectors.map({
randUnitVector => Math.floor(BLAS.dot(key, randUnitVector) / $(bucketLength))
})
Vectors.dense(hashValues)
// TODO: Output vectors of dimension numHashFunctions in SPARK-18450
hashValues.grouped(1).map(Vectors.dense).toArray
}
}

Expand All @@ -89,27 +90,29 @@ class RandomProjectionModel private[ml] (
}

@Since("2.1.0")
override protected[ml] def hashDistance(x: Vector, y: Vector): Double = {
override protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double = {
// Since it's generated by hashing, it will be a pair of dense vectors.
x.toDense.values.zip(y.toDense.values).map(pair => math.abs(pair._1 - pair._2)).min
x.zip(y).map(vectorPair => Vectors.sqdist(vectorPair._1, vectorPair._2)).min
}

@Since("2.1.0")
override def copy(extra: ParamMap): this.type = defaultCopy(extra)

@Since("2.1.0")
override def write: MLWriter = new RandomProjectionModel.RandomProjectionModelWriter(this)
override def write: MLWriter = {
new BucketedRandomProjectionLSHModel.BucketedRandomProjectionLSHModelWriter(this)
}
}

/**
* :: Experimental ::
*
* This [[RandomProjection]] implements Locality Sensitive Hashing functions for Euclidean
* distance metrics.
* This [[BucketedRandomProjectionLSH]] implements Locality Sensitive Hashing functions for
* Euclidean distance metrics.
*
* The input is dense or sparse vectors, each of which represents a point in the Euclidean
* distance space. The output will be vectors of configurable dimension. Hash value in the same
* dimension is calculated by the same hash function.
* distance space. The output will be vectors of configurable dimension. Hash values in the
* same dimension are calculated by the same hash function.
*
* References:
*
Expand All @@ -121,8 +124,9 @@ class RandomProjectionModel private[ml] (
*/
@Experimental
@Since("2.1.0")
class RandomProjection(override val uid: String) extends LSH[RandomProjectionModel]
with RandomProjectionParams with HasSeed {
class BucketedRandomProjectionLSH(override val uid: String)
extends LSH[BucketedRandomProjectionLSHModel]
with BucketedRandomProjectionLSHParams with HasSeed {

@Since("2.1.0")
override def setInputCol(value: String): this.type = super.setInputCol(value)
Expand All @@ -131,11 +135,11 @@ class RandomProjection(override val uid: String) extends LSH[RandomProjectionMod
override def setOutputCol(value: String): this.type = super.setOutputCol(value)

@Since("2.1.0")
override def setOutputDim(value: Int): this.type = super.setOutputDim(value)
override def setNumHashTables(value: Int): this.type = super.setNumHashTables(value)

@Since("2.1.0")
def this() = {
this(Identifiable.randomUID("random projection"))
this(Identifiable.randomUID("brp-lsh"))
}

/** @group setParam */
Expand All @@ -147,15 +151,16 @@ class RandomProjection(override val uid: String) extends LSH[RandomProjectionMod
def setSeed(value: Long): this.type = set(seed, value)

@Since("2.1.0")
override protected[this] def createRawLSHModel(inputDim: Int): RandomProjectionModel = {
override protected[this] def createRawLSHModel(
inputDim: Int): BucketedRandomProjectionLSHModel = {
val rand = new Random($(seed))
val randUnitVectors: Array[Vector] = {
Array.fill($(outputDim)) {
Array.fill($(numHashTables)) {
val randArray = Array.fill(inputDim)(rand.nextGaussian())
Vectors.fromBreeze(normalize(breeze.linalg.Vector(randArray)))
}
}
new RandomProjectionModel(uid, randUnitVectors)
new BucketedRandomProjectionLSHModel(uid, randUnitVectors)
}

@Since("2.1.0")
Expand All @@ -169,23 +174,25 @@ class RandomProjection(override val uid: String) extends LSH[RandomProjectionMod
}

@Since("2.1.0")
object RandomProjection extends DefaultParamsReadable[RandomProjection] {
object BucketedRandomProjectionLSH extends DefaultParamsReadable[BucketedRandomProjectionLSH] {

@Since("2.1.0")
override def load(path: String): RandomProjection = super.load(path)
override def load(path: String): BucketedRandomProjectionLSH = super.load(path)
}

@Since("2.1.0")
object RandomProjectionModel extends MLReadable[RandomProjectionModel] {
object BucketedRandomProjectionLSHModel extends MLReadable[BucketedRandomProjectionLSHModel] {

@Since("2.1.0")
override def read: MLReader[RandomProjectionModel] = new RandomProjectionModelReader
override def read: MLReader[BucketedRandomProjectionLSHModel] = {
new BucketedRandomProjectionLSHModelReader
}

@Since("2.1.0")
override def load(path: String): RandomProjectionModel = super.load(path)
override def load(path: String): BucketedRandomProjectionLSHModel = super.load(path)

private[RandomProjectionModel] class RandomProjectionModelWriter(instance: RandomProjectionModel)
extends MLWriter {
private[BucketedRandomProjectionLSHModel] class BucketedRandomProjectionLSHModelWriter(
instance: BucketedRandomProjectionLSHModel) extends MLWriter {

// TODO: Save using the existing format of Array[Vector] once SPARK-12878 is resolved.
private case class Data(randUnitVectors: Matrix)
Expand All @@ -203,20 +210,22 @@ object RandomProjectionModel extends MLReadable[RandomProjectionModel] {
}
}

private class RandomProjectionModelReader extends MLReader[RandomProjectionModel] {
private class BucketedRandomProjectionLSHModelReader
extends MLReader[BucketedRandomProjectionLSHModel] {

/** Checked against metadata when loading model */
private val className = classOf[RandomProjectionModel].getName
private val className = classOf[BucketedRandomProjectionLSHModel].getName

override def load(path: String): RandomProjectionModel = {
override def load(path: String): BucketedRandomProjectionLSHModel = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)

val dataPath = new Path(path, "data").toString
val data = sparkSession.read.parquet(dataPath)
val Row(randUnitVectors: Matrix) = MLUtils.convertMatrixColumnsToML(data, "randUnitVectors")
.select("randUnitVectors")
.head()
val model = new RandomProjectionModel(metadata.uid, randUnitVectors.rowIter.toArray)
val model = new BucketedRandomProjectionLSHModel(metadata.uid,
randUnitVectors.rowIter.toArray)

DefaultParamsReader.getAndSetParams(model, metadata)
model
Expand Down
102 changes: 57 additions & 45 deletions mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,28 @@ import org.apache.spark.sql.types._
*/
private[ml] trait LSHParams extends HasInputCol with HasOutputCol {
/**
* Param for the dimension of LSH OR-amplification.
* Param for the number of hash tables used in LSH OR-amplification.
*
* In this implementation, we use LSH OR-amplification to reduce the false negative rate. The
* higher the dimension is, the lower the false negative rate.
* LSH OR-amplification can be used to reduce the false negative rate. Higher values for this
* param lead to a reduced false negative rate, at the expense of added computational complexity.
* @group param
*/
final val outputDim: IntParam = new IntParam(this, "outputDim", "output dimension, where" +
" increasing dimensionality lowers the false negative rate, and decreasing dimensionality" +
" improves the running performance", ParamValidators.gt(0))
final val numHashTables: IntParam = new IntParam(this, "numHashTables", "number of hash " +
"tables, where increasing number of hash tables lowers the false negative rate, and " +
"decreasing it improves the running performance", ParamValidators.gt(0))

/** @group getParam */
final def getOutputDim: Int = $(outputDim)
final def getNumHashTables: Int = $(numHashTables)

setDefault(outputDim -> 1)
setDefault(numHashTables -> 1)

/**
* Transform the Schema for LSH
* @param schema The schema of the input dataset without [[outputCol]]
* @return A derived schema with [[outputCol]] added
*/
protected[this] final def validateAndTransformSchema(schema: StructType): StructType = {
SchemaUtils.appendColumn(schema, $(outputCol), new VectorUDT)
SchemaUtils.appendColumn(schema, $(outputCol), DataTypes.createArrayType(new VectorUDT))
}
}

Expand All @@ -66,10 +66,10 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]
self: T =>

/**
* The hash function of LSH, mapping a predefined KeyType to a Vector
* The hash function of LSH, mapping an input feature vector to multiple hash vectors.
* @return The mapping of LSH function.
*/
protected[ml] val hashFunction: Vector => Vector
protected[ml] val hashFunction: Vector => Array[Vector]

/**
* Calculate the distance between two different keys using the distance metric corresponding
Expand All @@ -87,41 +87,24 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]
* @param y Another hash vector
* @return The distance between hash vectors x and y
*/
protected[ml] def hashDistance(x: Vector, y: Vector): Double
protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double

override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
val transformUDF = udf(hashFunction, new VectorUDT)
val transformUDF = udf(hashFunction, DataTypes.createArrayType(new VectorUDT))
dataset.withColumn($(outputCol), transformUDF(dataset($(inputCol))))
}

override def transformSchema(schema: StructType): StructType = {
validateAndTransformSchema(schema)
}

/**
* Given a large dataset and an item, approximately find at most k items which have the closest
* distance to the item. If the [[outputCol]] is missing, the method will transform the data; if
* the [[outputCol]] exists, it will use the [[outputCol]]. This allows caching of the
* transformed data when necessary.
*
* This method implements two ways of fetching k nearest neighbors:
* - Single Probing: Fast, return at most k elements (Probing only one buckets)
* - Multiple Probing: Slow, return exact k elements (Probing multiple buckets close to the key)
*
* @param dataset the dataset to search for nearest neighbors of the key
* @param key Feature vector representing the item to search for
* @param numNearestNeighbors The maximum number of nearest neighbors
* @param singleProbing True for using Single Probing; false for multiple probing
* @param distCol Output column for storing the distance between each result row and the key
* @return A dataset containing at most k items closest to the key. A distCol is added to show
* the distance between each row and the key.
*/
def approxNearestNeighbors(
// TODO: Fix the MultiProbe NN Search in SPARK-18454
private[feature] def approxNearestNeighbors(
dataset: Dataset[_],
key: Vector,
numNearestNeighbors: Int,
singleProbing: Boolean,
singleProbe: Boolean,
distCol: String): Dataset[_] = {
require(numNearestNeighbors > 0, "The number of nearest neighbors cannot be less than 1")
// Get Hash Value of the key
Expand All @@ -132,14 +115,24 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]
dataset.toDF()
}

// In the origin dataset, find the hash value that is closest to the key
val hashDistUDF = udf((x: Vector) => hashDistance(x, keyHash), DataTypes.DoubleType)
val hashDistCol = hashDistUDF(col($(outputCol)))
val modelSubset = if (singleProbe) {
def sameBucket(x: Seq[Vector], y: Seq[Vector]): Boolean = {
x.zip(y).exists(tuple => tuple._1 == tuple._2)
}

// In the origin dataset, find the hash value that hash the same bucket with the key
val sameBucketWithKeyUDF = udf((x: Seq[Vector]) =>
sameBucket(x, keyHash), DataTypes.BooleanType)

val modelSubset = if (singleProbing) {
modelDataset.filter(hashDistCol === 0.0)
modelDataset.filter(sameBucketWithKeyUDF(col($(outputCol))))
} else {
// In the origin dataset, find the hash value that is closest to the key
// Limit the use of hashDist since it's controversial
val hashDistUDF = udf((x: Seq[Vector]) => hashDistance(x, keyHash), DataTypes.DoubleType)
val hashDistCol = hashDistUDF(col($(outputCol)))

// Compute threshold to get exact k elements.
// TODO: SPARK-18409: Use approxQuantile to get the threshold
val modelDatasetSortedByHash = modelDataset.sort(hashDistCol).limit(numNearestNeighbors)
val thresholdDataset = modelDatasetSortedByHash.select(max(hashDistCol))
val hashThreshold = thresholdDataset.take(1).head.getDouble(0)
Expand All @@ -155,8 +148,30 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]
}

/**
* Overloaded method for approxNearestNeighbors. Use Single Probing as default way to search
* nearest neighbors and "distCol" as default distCol.
* Given a large dataset and an item, approximately find at most k items which have the closest
* distance to the item. If the [[outputCol]] is missing, the method will transform the data; if
* the [[outputCol]] exists, it will use the [[outputCol]]. This allows caching of the
* transformed data when necessary.
*
* NOTE: This method is experimental and will likely change behavior in the next release.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: use @note

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*
* @param dataset the dataset to search for nearest neighbors of the key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Capitalize first words and add periods to all fields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* @param key Feature vector representing the item to search for
* @param numNearestNeighbors The maximum number of nearest neighbors
* @param distCol Output column for storing the distance between each result row and the key
* @return A dataset containing at most k items closest to the key. A distCol is added to show
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: A column "distCol" is added ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* the distance between each row and the key.
*/
def approxNearestNeighbors(
dataset: Dataset[_],
key: Vector,
numNearestNeighbors: Int,
distCol: String): Dataset[_] = {
approxNearestNeighbors(dataset, key, numNearestNeighbors, true, distCol)
}

/**
* Overloaded method for approxNearestNeighbors. Use "distCol" as default distCol.
*/
def approxNearestNeighbors(
dataset: Dataset[_],
Expand All @@ -179,16 +194,13 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]
inputName: String,
explodeCols: Seq[String]): Dataset[_] = {
require(explodeCols.size == 2, "explodeCols must be two strings.")
val vectorToMap = udf((x: Vector) => x.asBreeze.iterator.toMap,
MapType(DataTypes.IntegerType, DataTypes.DoubleType))
val modelDataset: DataFrame = if (!dataset.columns.contains($(outputCol))) {
transform(dataset)
} else {
dataset.toDF()
}
modelDataset.select(
struct(col("*")).as(inputName),
explode(vectorToMap(col($(outputCol)))).as(explodeCols))
struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well here's a fun one. When I run this test:

  test("memory leak test") {
    val numDim = 50
    val data = {
      for (i <- 0 until numDim; j <- Seq(-2, -1, 1, 2))
        yield Vectors.sparse(numDim, Seq((i, j.toDouble)))
    }
    val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys")

    // Project from 100 dimensional Euclidean Space to 10 dimensions
    val brp = new BucketedRandomProjectionLSH()
      .setNumHashTables(10)
      .setInputCol("keys")
      .setOutputCol("values")
      .setBucketLength(2.5)
      .setSeed(12345)
    val model = brp.fit(df)
    val joined = model.approxSimilarityJoin(df, df, Double.MaxValue, "distCol")
    joined.show()
}

I get the following error:

[info] - BucketedRandomProjectionLSH with high dimension data: test of LSH property *** FAILED *** (7 seconds, 568 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 205, localhost, executor driver): org.apache.spark.SparkException: Managed memory leak detected; size = 33816576 bytes, TID = 205
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:295)
[info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info]  at java.lang.Thread.run(Thread.java:745)

Could you run the same test and see if you get an error?

Copy link
Contributor Author

@Yunni Yunni Nov 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not get the same error, and the result shows successfully. Could you provide me with the full stack of the Exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I still get it. Did you use the code above? It's not directly copy pasted from the existing tests.

- memory leak test *** FAILED *** (8 seconds, 938 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 204, localhost, executor driver): org.apache.spark.SparkException: Managed memory leak detected; size = 33816576 bytes, TID = 204
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:295)
[info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info]  at java.lang.Thread.run(Thread.java:745)
[info] 
[info] Driver stacktrace:
[info]   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
[info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[info]   at scala.Option.foreach(Option.scala:257)
[info]   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
[info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
[info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
[info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
[info]   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[info]   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1896)
[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1909)
[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
[info]   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
[info]   at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
[info]   at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2323)
[info]   at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
[info]   at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2717)
[info]   at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2322)
[info]   at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2329)
[info]   at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2065)
[info]   at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2064)
[info]   at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2747)
[info]   at org.apache.spark.sql.Dataset.head(Dataset.scala:2064)
[info]   at org.apache.spark.sql.Dataset.take(Dataset.scala:2279)
[info]   at org.apache.spark.sql.Dataset.showString(Dataset.scala:247)
[info]   at org.apache.spark.sql.Dataset.show(Dataset.scala:596)
[info]   at org.apache.spark.sql.Dataset.show(Dataset.scala:555)
[info]   at org.apache.spark.sql.Dataset.show(Dataset.scala:564)
[info]   at org.apache.spark.ml.feature.BucketedRandomProjectionLSHSuite$$anonfun$3.apply$mcV$sp(BucketedRandomProjectionLSHSuite.scala:74)

Copy link
Contributor Author

@Yunni Yunni Nov 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I copied your code to BucketedRandomProjectionLSHSuite.scala and it runs fine for me with the following output:

+--------------------+--------------------+------------------+
|            datasetA|            datasetB|           distCol|
+--------------------+--------------------+------------------+
|[(50,[0],[-2.0]),...|[(50,[0],[-1.0]),...|               1.0|
|[(50,[4],[-1.0]),...|[(50,[23],[-1.0])...|1.4142135623730951|
|[(50,[5],[-1.0]),...|[(50,[32],[-1.0])...|1.4142135623730951|
|[(50,[7],[1.0]),W...|[(50,[47],[1.0]),...|1.4142135623730951|
|[(50,[7],[2.0]),W...|[(50,[26],[-2.0])...|2.8284271247461903|
|[(50,[8],[-2.0]),...|[(50,[1],[-1.0]),...|  2.23606797749979|
|[(50,[8],[-1.0]),...|[(50,[23],[-2.0])...|  2.23606797749979|
|[(50,[10],[-1.0])...|[(50,[7],[2.0]),W...|  2.23606797749979|
|[(50,[10],[-1.0])...|[(50,[13],[2.0]),...|  2.23606797749979|
|[(50,[11],[-1.0])...|[(50,[39],[2.0]),...|  2.23606797749979|
|[(50,[12],[-2.0])...|[(50,[28],[1.0]),...|  2.23606797749979|
|[(50,[12],[-2.0])...|[(50,[29],[-1.0])...|  2.23606797749979|
|[(50,[13],[1.0]),...|[(50,[2],[-2.0]),...|  2.23606797749979|
|[(50,[14],[1.0]),...|[(50,[33],[2.0]),...|  2.23606797749979|
|[(50,[14],[2.0]),...|[(50,[28],[2.0]),...|2.8284271247461903|
|[(50,[15],[-1.0])...|[(50,[38],[-1.0])...|1.4142135623730951|
|[(50,[18],[1.0]),...|[(50,[8],[-1.0]),...|1.4142135623730951|
|[(50,[18],[1.0]),...|[(50,[12],[-2.0])...|  2.23606797749979|
|[(50,[18],[2.0]),...|[(50,[43],[1.0]),...|  2.23606797749979|
|[(50,[20],[1.0]),...|[(50,[25],[-1.0])...|1.4142135623730951|
+--------------------+--------------------+------------------+
only showing top 20 rows

Let me see if the test can pass jenkins or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at line 292 of Executor.scala, it shows this is just a OOM exception of Dataframe. That's the reason why it behaves different on our machines and Jenkins.
model.approxSimilarityJoin(df, df, Double.MaxValue, "distCol") returns near 40000 rows when threshold = Double.MaxValue
If you reduce the numDim, the test will pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #15916

}

/**
Expand Down Expand Up @@ -293,7 +305,7 @@ private[ml] abstract class LSH[T <: LSHModel[T]]
def setOutputCol(value: String): this.type = set(outputCol, value)

/** @group setParam */
def setOutputDim(value: Int): this.type = set(outputDim, value)
def setNumHashTables(value: Int): this.type = set(numHashTables, value)

/**
* Validate and create a new instance of concrete LSHModel. Because different LSHModel may have
Expand Down
Loading