Skip to content

Commit

Permalink
iter->none/some & negate value -> negate count
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengruifeng committed Nov 18, 2020
1 parent e0605d6 commit 5875c65
Showing 1 changed file with 7 additions and 9 deletions.
16 changes: 7 additions & 9 deletions mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,13 @@ class Imputer @Since("2.2.0") (@Since("2.2.0") override val uid: String)
// If there is more than one mode, choose the smallest one to keep in line
// with sklearn.impute.SimpleImputer (using scipy.stats.mode).
val modes = dataset.select(cols: _*).flatMap { row =>
Iterator.range(0, numCols).flatMap { i =>
// Ignore null.
// negative value to apply the default ranking of [Long, Double]
if (row.isNullAt(i)) Iterator.empty else Iterator.single((i, -row.getDouble(i)))
}
}.toDF("index", "negative_value")
.groupBy("index", "negative_value").agg(count(lit(0)).as("count"))
.groupBy("index").agg(max(struct("count", "negative_value")).as("mode"))
.select(col("index"), negate(col("mode.negative_value")))
// Ignore null.
Iterator.range(0, numCols)
.flatMap(i => if (row.isNullAt(i)) None else Some((i, row.getDouble(i))))
}.toDF("index", "value")
.groupBy("index", "value").agg(negate(count(lit(0))).as("negative_count"))
.groupBy("index").agg(min(struct("negative_count", "value")).as("mode"))
.select("index", "mode.value")
.as[(Int, Double)].collect().toMap
Array.tabulate(numCols)(i => modes.getOrElse(i, Double.NaN))
}
Expand Down

0 comments on commit 5875c65

Please sign in to comment.