Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into SPARK-7422
Browse files Browse the repository at this point in the history
  • Loading branch information
George authored and George committed May 13, 2015
2 parents df9538a + 97dee31 commit 4526acc
Show file tree
Hide file tree
Showing 148 changed files with 6,757 additions and 1,010 deletions.
7 changes: 4 additions & 3 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ setMethod("isLocal",
callJMethod(x@sdf, "isLocal")
})

#' ShowDF
#' showDF
#'
#' Print the first numRows rows of a DataFrame
#'
Expand All @@ -170,7 +170,8 @@ setMethod("isLocal",
setMethod("showDF",
signature(x = "DataFrame"),
function(x, numRows = 20) {
callJMethod(x@sdf, "showString", numToInt(numRows))
s <- callJMethod(x@sdf, "showString", numToInt(numRows))
cat(s)
})

#' show
Expand All @@ -187,7 +188,7 @@ setMethod("showDF",
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' show(df)
#' df
#'}
setMethod("show", "DataFrame",
function(object) {
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
})

setMethod("show", "RDD",
function(.Object) {
cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep=""))
function(object) {
cat(paste(callJMethod(getJRDD(object), "toString"), "\n", sep=""))
})

setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
Expand Down
4 changes: 1 addition & 3 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ setMethod("agg",
}
}
jcols <- lapply(cols, function(c) { c@jc })
# the GroupedData.agg(col, cols*) API does not contain grouping Column
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "aggWithGrouping",
x@sgd, listToSeq(jcols))
sdf <- callJMethod(x@sgd, "agg", jcols[[1]], listToSeq(jcols[-1]))
} else {
stop("agg can only support Column or character")
}
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ test_that("toJSON() returns an RDD of the correct values", {

test_that("showDF()", {
df <- jsonFile(sqlCtx, jsonPath)
expect_output(showDF(df), "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
s <- capture.output(showDF(df))
expect_output(s , "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
})

test_that("isLocal()", {
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ can be run using:
./dev/run-tests

Please see the guidance on how to
[run all automated tests](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-AutomatedTesting).
[run tests for a module, or individual tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools).

## A Note About Hadoop Versions

Expand Down
47 changes: 0 additions & 47 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -381,35 +381,6 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<!-- Unzip py4j so we can include its files in the jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
<configuration>
<target>
<unzip src="../python/lib/py4j-0.8.2.1-src.zip" dest="../python/build" />
</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<configuration>
<filesets>
<fileset>
<directory>${basedir}/../python/build</directory>
</fileset>
</filesets>
<verbose>true</verbose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
Expand Down Expand Up @@ -438,24 +409,6 @@
</executions>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>../python</directory>
<includes>
<include>pyspark/*.py</include>
</includes>
</resource>
<resource>
<directory>../python/build</directory>
<includes>
<include>py4j/*.py</include>
</includes>
</resource>
</resources>
</build>

<profiles>
Expand Down

Large diffs are not rendered by default.

80 changes: 33 additions & 47 deletions core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ function renderDagViz(forJob) {
svg.selectAll("#" + nodeId).classed("cached", true);
});

// More post-processing
drawClusterLabels(svg, forJob);
resizeSvg(svg);
}

Expand All @@ -151,7 +149,7 @@ function renderDagVizForStage(svgContainer) {
var dot = metadata.select(".dot-file").text();
var containerId = VizConstants.graphPrefix + metadata.attr("stage-id");
var container = svgContainer.append("g").attr("id", containerId);
renderDot(dot, container, StagePageVizConstants.rankSep);
renderDot(dot, container, false);

// Round corners on rectangles
svgContainer
Expand Down Expand Up @@ -209,7 +207,7 @@ function renderDagVizForJob(svgContainer) {
}

// Actually render the stage
renderDot(dot, container, JobPageVizConstants.rankSep);
renderDot(dot, container, true);

// Round corners on rectangles
container
Expand All @@ -231,14 +229,14 @@ function renderDagVizForJob(svgContainer) {
}

/* Render the dot file as an SVG in the given container. */
function renderDot(dot, container, rankSep) {
function renderDot(dot, container, forJob) {
var escaped_dot = dot
.replace(/&lt;/g, "<")
.replace(/&gt;/g, ">")
.replace(/&quot;/g, "\"");
var g = graphlibDot.read(escaped_dot);
g.graph().rankSep = rankSep;
var renderer = new dagreD3.render();
preprocessGraphLayout(g, forJob);
renderer(container, g);
}

Expand All @@ -251,50 +249,38 @@ function graphContainer() { return d3.select("#dag-viz-graph"); }
function metadataContainer() { return d3.select("#dag-viz-metadata"); }

/*
* Helper function to create draw a label for each cluster.
*
* We need to do this manually because dagre-d3 does not support labeling clusters.
* In general, the clustering support for dagre-d3 is quite limited at this point.
* Helper function to pre-process the graph layout.
* This step is necessary for certain styles that affect the positioning
* and sizes of graph elements, e.g. padding, font style, shape.
*/
function drawClusterLabels(svgContainer, forJob) {
var clusterLabelSize, stageClusterLabelSize;
function preprocessGraphLayout(g, forJob) {
var nodes = g.nodes();
for (var i = 0; i < nodes.length; i++) {
var isCluster = g.children(nodes[i]).length > 0;
if (!isCluster) {
var node = g.node(nodes[i]);
if (forJob) {
// Do not display RDD name on job page
node.shape = "circle";
node.labelStyle = "font-size: 0px";
} else {
node.labelStyle = "font-size: 12px";
}
node.padding = "5";
}
}
// Curve the edges
var edges = g.edges();
for (var j = 0; j < edges.length; j++) {
var edge = g.edge(edges[j]);
edge.lineInterpolate = "basis";
}
// Adjust vertical separation between nodes
if (forJob) {
clusterLabelSize = JobPageVizConstants.clusterLabelSize;
stageClusterLabelSize = JobPageVizConstants.stageClusterLabelSize;
g.graph().rankSep = JobPageVizConstants.rankSep;
} else {
clusterLabelSize = StagePageVizConstants.clusterLabelSize;
stageClusterLabelSize = StagePageVizConstants.stageClusterLabelSize;
g.graph().rankSep = StagePageVizConstants.rankSep;
}
svgContainer.selectAll("g.cluster").each(function() {
var cluster = d3.select(this);
var isStage = cluster.attr("id").indexOf(VizConstants.stageClusterPrefix) > -1;
var labelSize = isStage ? stageClusterLabelSize : clusterLabelSize;
drawClusterLabel(cluster, labelSize);
});
}

/*
* Helper function to draw a label for the given cluster element based on its name.
*
* In the process, we need to expand the bounding box to make room for the label.
* We need to do this because dagre-d3 did not take this into account when it first
* rendered the bounding boxes. Note that this means we need to adjust the view box
* of the SVG afterwards since we shifted a few boxes around.
*/
function drawClusterLabel(d3cluster, fontSize) {
var cluster = d3cluster;
var rect = d3cluster.select("rect");
rect.attr("y", toFloat(rect.attr("y")) - fontSize);
rect.attr("height", toFloat(rect.attr("height")) + fontSize);
var labelX = toFloat(rect.attr("x")) + toFloat(rect.attr("width")) - fontSize / 2;
var labelY = toFloat(rect.attr("y")) + fontSize * 1.5;
var labelText = cluster.attr("name").replace(VizConstants.clusterPrefix, "");
cluster.append("text")
.attr("x", labelX)
.attr("y", labelY)
.attr("text-anchor", "end")
.style("font-size", fontSize + "px")
.text(labelText);
}

/*
Expand Down Expand Up @@ -444,7 +430,7 @@ function addTooltipsForRDDs(svgContainer) {
if (tooltipText) {
node.select("circle")
.attr("data-toggle", "tooltip")
.attr("data-placement", "right")
.attr("data-placement", "bottom")
.attr("title", tooltipText)
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[spark] object PythonUtils {
def sparkPythonPath: String = {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
Expand All @@ -53,4 +53,11 @@ private[spark] object PythonUtils {
def toSeq[T](cols: JList[T]): Seq[T] = {
cols.toList.toSeq
}

/**
* Convert java map of K, V into Map of K, V (for calling API with varargs)
*/
def toScalaMap[K, V](jm: java.util.Map[K, V]): Map[K, V] = {
jm.toMap
}
}
52 changes: 41 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Comparator}

import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.spark.{Logging, SparkConf, SparkException}

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -199,13 +199,43 @@ class SparkHadoopUtil extends Logging {
* that file.
*/
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
def recurse(path: Path): Array[FileStatus] = {
val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
listLeafStatuses(fs, fs.getFileStatus(basePath))
}

/**
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
*/
def listLeafStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDir)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f))
}

val baseStatus = fs.getFileStatus(basePath)
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
if (baseStatus.isDir) recurse(baseStatus) else Seq(baseStatus)
}

def listLeafDirStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
listLeafDirStatuses(fs, fs.getFileStatus(basePath))
}

def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir)
val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus]
leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
}

assert(baseStatus.isDir)
recurse(baseStatus)
}

def globPath(pattern: Path): Seq[Path] = {
val fs = pattern.getFileSystem(conf)
Option(fs.globStatus(pattern)).map { statuses =>
statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
}.getOrElse(Seq.empty[Path])
}

/**
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1523,13 +1523,15 @@ abstract class RDD[T: ClassTag](
* has completed (therefore the RDD has been materialized and potentially stored in memory).
* doCheckpoint() is called recursively on the parent RDDs.
*/
private[spark] def doCheckpoint() {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.doCheckpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", false, true) {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.doCheckpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
Expand Down
Loading

0 comments on commit 4526acc

Please sign in to comment.