Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-18775
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Dec 8, 2016
2 parents 3199f8f + b47b892 commit f77730f
Show file tree
Hide file tree
Showing 40 changed files with 665 additions and 332 deletions.
105 changes: 37 additions & 68 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -733,9 +733,6 @@ setMethod("predict", signature(object = "KMeansModel"),
#' excepting that at most one value may be 0. The class with largest value p/t is predicted, where p
#' is the original probability of that class and t is the class's threshold.
#' @param weightCol The weight column name.
#' @param aggregationDepth depth for treeAggregate (>= 2). If the dimensions of features or the number of partitions
#' are large, this param could be adjusted to a larger size.
#' @param probabilityCol column name for predicted class conditional probabilities.
#' @param ... additional arguments passed to the method.
#' @return \code{spark.logit} returns a fitted logistic regression model
#' @rdname spark.logit
Expand All @@ -746,45 +743,35 @@ setMethod("predict", signature(object = "KMeansModel"),
#' \dontrun{
#' sparkR.session()
#' # binary logistic regression
#' label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
#' features <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
#' binary_data <- as.data.frame(cbind(label, features))
#' binary_df <- createDataFrame(binary_data)
#' blr_model <- spark.logit(binary_df, label ~ features, thresholds = 1.0)
#' blr_predict <- collect(select(predict(blr_model, binary_df), "prediction"))
#'
#' # summary of binary logistic regression
#' blr_summary <- summary(blr_model)
#' blr_fmeasure <- collect(select(blr_summary$fMeasureByThreshold, "threshold", "F-Measure"))
#' df <- createDataFrame(iris)
#' training <- df[df$Species %in% c("versicolor", "virginica"), ]
#' model <- spark.logit(training, Species ~ ., regParam = 0.5)
#' summary <- summary(model)
#'
#' # fitted values on training data
#' fitted <- predict(model, training)
#'
#' # save fitted model to input path
#' path <- "path/to/model"
#' write.ml(blr_model, path)
#' write.ml(model, path)
#'
#' # can also read back the saved model and predict
#' # Note that summary deos not work on loaded model
#' savedModel <- read.ml(path)
#' blr_predict2 <- collect(select(predict(savedModel, binary_df), "prediction"))
#' summary(savedModel)
#'
#' # multinomial logistic regression
#'
#' label <- c(0.0, 1.0, 2.0, 0.0, 0.0)
#' feature1 <- c(4.845940, 5.64480, 7.430381, 6.464263, 5.555667)
#' feature2 <- c(2.941319, 2.614812, 2.162451, 3.339474, 2.970987)
#' feature3 <- c(1.322733, 1.348044, 3.861237, 9.686976, 3.447130)
#' feature4 <- c(1.3246388, 0.5510444, 0.9225810, 1.2147881, 1.6020842)
#' data <- as.data.frame(cbind(label, feature1, feature2, feature3, feature4))
#' df <- createDataFrame(data)
#' df <- createDataFrame(iris)
#' model <- spark.logit(df, Species ~ ., regParam = 0.5)
#' summary <- summary(model)
#'
#' # Note that summary of multinomial logistic regression is not implemented yet
#' model <- spark.logit(df, label ~ ., family = "multinomial", thresholds = c(0, 1, 1))
#' predict1 <- collect(select(predict(model, df), "prediction"))
#' }
#' @note spark.logit since 2.1.0
setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, regParam = 0.0, elasticNetParam = 0.0, maxIter = 100,
tol = 1E-6, family = "auto", standardization = TRUE,
thresholds = 0.5, weightCol = NULL, aggregationDepth = 2,
probabilityCol = "probability") {
thresholds = 0.5, weightCol = NULL) {
formula <- paste(deparse(formula), collapse = "")

if (is.null(weightCol)) {
Expand All @@ -796,8 +783,7 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula")
as.numeric(elasticNetParam), as.integer(maxIter),
as.numeric(tol), as.character(family),
as.logical(standardization), as.array(thresholds),
as.character(weightCol), as.integer(aggregationDepth),
as.character(probabilityCol))
as.character(weightCol))
new("LogisticRegressionModel", jobj = jobj)
})

Expand All @@ -817,44 +803,29 @@ setMethod("predict", signature(object = "LogisticRegressionModel"),
# Get the summary of an LogisticRegressionModel

#' @param object an LogisticRegressionModel fitted by \code{spark.logit}
#' @return \code{summary} returns the Binary Logistic regression results of a given model as list,
#' including roc, areaUnderROC, pr, fMeasureByThreshold, precisionByThreshold,
#' recallByThreshold, totalIterations, objectiveHistory. Note that Multinomial logistic
#' regression summary is not available now.
#' @return \code{summary} returns coefficients matrix of the fitted model
#' @rdname spark.logit
#' @aliases summary,LogisticRegressionModel-method
#' @export
#' @note summary(LogisticRegressionModel) since 2.1.0
setMethod("summary", signature(object = "LogisticRegressionModel"),
function(object) {
jobj <- object@jobj
is.loaded <- callJMethod(jobj, "isLoaded")

if (is.loaded) {
stop("Loaded model doesn't have training summary.")
features <- callJMethod(jobj, "rFeatures")
labels <- callJMethod(jobj, "labels")
coefficients <- callJMethod(jobj, "rCoefficients")
nCol <- length(coefficients) / length(features)
coefficients <- matrix(coefficients, ncol = nCol)
# If nCol == 1, means this is a binomial logistic regression model with pivoting.
# Otherwise, it's a multinomial logistic regression model without pivoting.
if (nCol == 1) {
colnames(coefficients) <- c("Estimate")
} else {
colnames(coefficients) <- unlist(labels)
}
rownames(coefficients) <- unlist(features)

roc <- dataFrame(callJMethod(jobj, "roc"))

areaUnderROC <- callJMethod(jobj, "areaUnderROC")

pr <- dataFrame(callJMethod(jobj, "pr"))

fMeasureByThreshold <- dataFrame(callJMethod(jobj, "fMeasureByThreshold"))

precisionByThreshold <- dataFrame(callJMethod(jobj, "precisionByThreshold"))

recallByThreshold <- dataFrame(callJMethod(jobj, "recallByThreshold"))

totalIterations <- callJMethod(jobj, "totalIterations")

objectiveHistory <- callJMethod(jobj, "objectiveHistory")

list(roc = roc, areaUnderROC = areaUnderROC, pr = pr,
fMeasureByThreshold = fMeasureByThreshold,
precisionByThreshold = precisionByThreshold,
recallByThreshold = recallByThreshold,
totalIterations = totalIterations, objectiveHistory = objectiveHistory)
list(coefficients = coefficients)
})

#' Multilayer Perceptron Classification Model
Expand Down Expand Up @@ -1453,7 +1424,7 @@ setMethod("predict", signature(object = "GaussianMixtureModel"),
#' @param userCol column name for user ids. Ids must be (or can be coerced into) integers.
#' @param itemCol column name for item ids. Ids must be (or can be coerced into) integers.
#' @param rank rank of the matrix factorization (> 0).
#' @param reg regularization parameter (>= 0).
#' @param regParam regularization parameter (>= 0).
#' @param maxIter maximum number of iterations (>= 0).
#' @param nonnegative logical value indicating whether to apply nonnegativity constraints.
#' @param implicitPrefs logical value indicating whether to use implicit preference.
Expand Down Expand Up @@ -1492,29 +1463,29 @@ setMethod("predict", signature(object = "GaussianMixtureModel"),
#'
#' # set other arguments
#' modelS <- spark.als(df, "rating", "user", "item", rank = 20,
#' reg = 0.1, nonnegative = TRUE)
#' regParam = 0.1, nonnegative = TRUE)
#' statsS <- summary(modelS)
#' }
#' @note spark.als since 2.1.0
setMethod("spark.als", signature(data = "SparkDataFrame"),
function(data, ratingCol = "rating", userCol = "user", itemCol = "item",
rank = 10, reg = 0.1, maxIter = 10, nonnegative = FALSE,
rank = 10, regParam = 0.1, maxIter = 10, nonnegative = FALSE,
implicitPrefs = FALSE, alpha = 1.0, numUserBlocks = 10, numItemBlocks = 10,
checkpointInterval = 10, seed = 0) {

if (!is.numeric(rank) || rank <= 0) {
stop("rank should be a positive number.")
}
if (!is.numeric(reg) || reg < 0) {
stop("reg should be a nonnegative number.")
if (!is.numeric(regParam) || regParam < 0) {
stop("regParam should be a nonnegative number.")
}
if (!is.numeric(maxIter) || maxIter <= 0) {
stop("maxIter should be a positive number.")
}

jobj <- callJStatic("org.apache.spark.ml.r.ALSWrapper",
"fit", data@sdf, ratingCol, userCol, itemCol, as.integer(rank),
reg, as.integer(maxIter), implicitPrefs, alpha, nonnegative,
regParam, as.integer(maxIter), implicitPrefs, alpha, nonnegative,
as.integer(numUserBlocks), as.integer(numItemBlocks),
as.integer(checkpointInterval), as.integer(seed))
new("ALSModel", jobj = jobj)
Expand Down Expand Up @@ -1712,8 +1683,6 @@ print.summary.KSTest <- function(x, ...) {
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
#' can speed up training of deeper trees. Users can set how often should the
#' cache be checkpointed or disable it by setting checkpointInterval.
#' @param probabilityCol column name for predicted class conditional probabilities, only for
#' classification.
#' @param ... additional arguments passed to the method.
#' @aliases spark.randomForest,SparkDataFrame,formula-method
#' @return \code{spark.randomForest} returns a fitted Random Forest model.
Expand Down Expand Up @@ -1748,7 +1717,7 @@ setMethod("spark.randomForest", signature(data = "SparkDataFrame", formula = "fo
maxDepth = 5, maxBins = 32, numTrees = 20, impurity = NULL,
featureSubsetStrategy = "auto", seed = NULL, subsamplingRate = 1.0,
minInstancesPerNode = 1, minInfoGain = 0.0, checkpointInterval = 10,
maxMemoryInMB = 256, cacheNodeIds = FALSE, probabilityCol = "probability") {
maxMemoryInMB = 256, cacheNodeIds = FALSE) {
type <- match.arg(type)
formula <- paste(deparse(formula), collapse = "")
if (!is.null(seed)) {
Expand Down Expand Up @@ -1777,7 +1746,7 @@ setMethod("spark.randomForest", signature(data = "SparkDataFrame", formula = "fo
impurity, as.integer(minInstancesPerNode),
as.numeric(minInfoGain), as.integer(checkpointInterval),
as.character(featureSubsetStrategy), seed,
as.numeric(subsamplingRate), as.character(probabilityCol),
as.numeric(subsamplingRate),
as.integer(maxMemoryInMB), as.logical(cacheNodeIds))
new("RandomForestClassificationModel", jobj = jobj)
}
Expand Down
Loading

0 comments on commit f77730f

Please sign in to comment.