Skip to content

Commit

Permalink
Merge pull request apache#167 from sun-rui/removePartionByInRDD
Browse files Browse the repository at this point in the history
Remove partitionBy() in RDD.
  • Loading branch information
shivaram committed Feb 16, 2015
2 parents 52f94c4 + 7fcb46a commit 2271030
Showing 1 changed file with 36 additions and 119 deletions.
155 changes: 36 additions & 119 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1275,87 +1275,51 @@ setMethod("aggregateRDD",
Reduce(combOp, partitionList, zeroValue)
})

############ Shuffle Functions ############

#' Partition an RDD by key
#'
#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
#' For each element of this RDD, the partitioner is used to compute a hash
#' function and the RDD is partitioned using this hash value.
#'
#' @param rdd The RDD to partition. Should be an RDD where each element is
#' list(K, V) or c(K, V).
#' @param numPartitions Number of partitions to create.
#' @param ... Other optional arguments to partitionBy.
# TODO: Consider caching the name in the RDD's environment
#' Return an RDD's name.
#'
#' @param partitionFunc The partition function to use. Uses a default hashCode
#' function if not provided
#' @return An RDD partitioned using the specified partitioner.
#' @rdname partitionBy
#' @param rdd The RDD whose name is returned.
#' @rdname name
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
#' rdd <- parallelize(sc, pairs)
#' parts <- partitionBy(rdd, 2L)
#' collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
#' rdd <- parallelize(sc, list(1,2,3))
#' name(rdd) # NULL (if not set before)
#'}
setGeneric("partitionBy",
function(rdd, numPartitions, ...) {
standardGeneric("partitionBy")
})

#' @rdname partitionBy
#' @aliases partitionBy,RDD,integer-method
setMethod("partitionBy",
signature(rdd = "RDD", numPartitions = "integer"),
function(rdd, numPartitions, partitionFunc = hashCode) {

#if (missing(partitionFunc)) {
# partitionFunc <- hashCode
#}

depsBinArr <- getDependencies(partitionFunc)
setGeneric("name", function(rdd) { standardGeneric("name") })

serializedHashFuncBytes <- serialize(as.character(substitute(partitionFunc)),
connection = NULL,
ascii = TRUE)
#' @rdname name
#' @aliases name,RDD
setMethod("name",
signature(rdd = "RDD"),
function(rdd) {
callJMethod(getJRDD(rdd), "name")
})

packageNamesArr <- serialize(.sparkREnv$.packages,
connection = NULL,
ascii = TRUE)
broadcastArr <- lapply(ls(.broadcastNames), function(name) {
get(name, .broadcastNames) })
jrdd <- getJRDD(rdd)
#' Set an RDD's name.
#'
#' @param rdd The RDD whose name is to be set.
#' @param name The RDD name to be set.
#' @return a new RDD renamed.
#' @rdname setName
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1,2,3))
#' setName(rdd, "myRDD")
#' name(rdd) # "myRDD"
#'}
setGeneric("setName", function(rdd, name) { standardGeneric("setName") })

# We create a PairwiseRRDD that extends RDD[(Array[Byte],
# Array[Byte])], where the key is the hashed split, the value is
# the content (key-val pairs).
pairwiseRRDD <- newJObject("edu.berkeley.cs.amplab.sparkr.PairwiseRRDD",
callJMethod(jrdd, "rdd"),
as.integer(numPartitions),
serializedHashFuncBytes,
rdd@env$serialized,
depsBinArr,
packageNamesArr,
as.character(.sparkREnv$libname),
broadcastArr,
callJMethod(jrdd, "classTag"))

# Create a corresponding partitioner.
rPartitioner <- newJObject("org.apache.spark.HashPartitioner",
as.integer(numPartitions))

# Call partitionBy on the obtained PairwiseRDD.
javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD")
javaPairRDD <- callJMethod(javaPairRDD, "partitionBy", rPartitioner)

# Call .values() on the result to get back the final result, the
# shuffled acutal content key-val pairs.
r <- callJMethod(javaPairRDD, "values")

RDD(r, serialized = TRUE)
#' @rdname setName
#' @aliases setName,RDD
setMethod("setName",
signature(rdd = "RDD", name = "character"),
function(rdd, name) {
callJMethod(getJRDD(rdd), "setName", name)
rdd
})

############ Binary Functions #############
Expand Down Expand Up @@ -1397,50 +1361,3 @@ setMethod("unionRDD",
}
union.rdd
})

# TODO: Consider caching the name in the RDD's environment
#' Return an RDD's name.
#'
#' @param rdd The RDD whose name is returned.
#' @rdname name
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1,2,3))
#' name(rdd) # NULL (if not set before)
#'}
setGeneric("name", function(rdd) { standardGeneric("name") })

#' @rdname name
#' @aliases name,RDD
setMethod("name",
signature(rdd = "RDD"),
function(rdd) {
callJMethod(getJRDD(rdd), "name")
})

#' Set an RDD's name.
#'
#' @param rdd The RDD whose name is to be set.
#' @param name The RDD name to be set.
#' @return a new RDD renamed.
#' @rdname setName
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1,2,3))
#' setName(rdd, "myRDD")
#' name(rdd) # "myRDD"
#'}
setGeneric("setName", function(rdd, name) { standardGeneric("setName") })

#' @rdname setName
#' @aliases setName,RDD
setMethod("setName",
signature(rdd = "RDD", name = "character"),
function(rdd, name) {
callJMethod(getJRDD(rdd), "setName", name)
rdd
})

0 comments on commit 2271030

Please sign in to comment.