Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
Browse files Browse the repository at this point in the history
…712_new
  • Loading branch information
witgo committed May 12, 2014
2 parents 86e2048 + a6b02fb commit 743a7ad
Show file tree
Hide file tree
Showing 151 changed files with 2,273 additions and 1,728 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ unit-tests.log
/lib/
rat-results.txt
scalastyle.txt
conf/*.conf

# For Hive
metastore_db/
Expand Down
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,22 @@ And run the following command, which should also return 1000:
## Example Programs

Spark also comes with several sample programs in the `examples` directory.
To run one of them, use `./bin/run-example <class> <params>`. For example:
To run one of them, use `./bin/run-example <class> [params]`. For example:

./bin/run-example org.apache.spark.examples.SparkLR local[2]
./bin/run-example org.apache.spark.examples.SparkLR

will run the Logistic Regression example locally on 2 CPUs.
will run the Logistic Regression example locally.

Each of the example programs prints usage help if no params are given.
You can set the MASTER environment variable when running examples to submit
examples to a cluster. This can be a mesos:// or spark:// URL,
"yarn-cluster" or "yarn-client" to run on YARN, and "local" to run
locally with one thread, or "local[N]" to run locally with N threads. You
can also use an abbreviated class name if the class is in the `examples`
package. For instance:

All of the Spark samples take a `<master>` parameter that is the cluster URL
to connect to. This can be a mesos:// or spark:// URL, or "local" to run
locally with one thread, or "local[N]" to run locally with N threads.
MASTER=spark://host:7077 ./bin/run-example SparkPi

Many of the example programs print usage help if no params are given.

## Running Tests

Expand Down
1 change: 1 addition & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>org.datanucleus:*</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ if [ ! -f "$FWDIR/RELEASE" ]; then
ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null
if [[ $? != 0 ]]; then
echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2
echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
echo "You need to build Spark before running this program" >&2
exit 1
fi
fi
Expand Down
71 changes: 18 additions & 53 deletions bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,10 @@
# limitations under the License.
#

cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
esac

SCALA_VERSION=2.10

# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

# Export this as SPARK_HOME
export SPARK_HOME="$FWDIR"

. $FWDIR/bin/load-spark-env.sh

if [ -z "$1" ]; then
echo "Usage: run-example <example-class> [<args>]" >&2
exit 1
fi

# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
# to avoid the -sources and -doc packages that are built by publish-local.
EXAMPLES_DIR="$FWDIR"/examples

if [ -f "$FWDIR/RELEASE" ]; then
Expand All @@ -49,46 +31,29 @@ fi

if [[ -z $SPARK_EXAMPLES_JAR ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" >&2
echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
echo "You need to build Spark before running this program" >&2
exit 1
fi

EXAMPLE_MASTER=${MASTER:-"local[*]"}

# Since the examples JAR ideally shouldn't include spark-core (that dependency should be
# "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"

if $cygwin; then
CLASSPATH=`cygpath -wp $CLASSPATH`
export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR`
fi

# Find java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$SPARK_JAVA_OPTS"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
if [ -n "$1" ]; then
EXAMPLE_CLASS="$1"
shift
else
echo "usage: ./bin/run-example <example-class> [example-args]"
echo " - set MASTER=XX to use a specific master"
echo " - can use abbreviated example class name (e.g. SparkPi, mllib.MovieLensALS)"
echo
exit -1
fi
export JAVA_OPTS

if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
echo -n "Spark Command: "
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
echo "========================================"
echo
if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
fi

exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
./bin/spark-submit \
--master $EXAMPLE_MASTER \
--class $EXAMPLE_CLASS \
$SPARK_EXAMPLES_JAR \
"$@"
2 changes: 1 addition & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ if [ ! -f "$FWDIR/RELEASE" ]; then
jars_list=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep "spark-assembly.*hadoop.*.jar")
if [ "$num_jars" -eq "0" ]; then
echo "Failed to find Spark assembly in $FWDIR/assembly/target/scala-$SCALA_VERSION/" >&2
echo "You need to build Spark with 'sbt/sbt assembly' before running this program." >&2
echo "You need to build Spark before running this program." >&2
exit 1
fi
if [ "$num_jars" -gt "1" ]; then
Expand Down
6 changes: 4 additions & 2 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ while (($#)); do
shift
done

if [ ! -z $DRIVER_MEMORY ] && [ ! -z $DEPLOY_MODE ] && [ $DEPLOY_MODE = "client" ]; then
export SPARK_MEM=$DRIVER_MEMORY
DEPLOY_MODE=${DEPLOY_MODE:-"client"}

if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then
export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY
fi

$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{ObjectInputStream, Serializable}

import scala.collection.generic.Growable
import scala.collection.mutable.Map
import scala.reflect.ClassTag

import org.apache.spark.serializer.JavaSerializer

Expand Down Expand Up @@ -164,9 +165,9 @@ trait AccumulableParam[R, T] extends Serializable {
def zero(initialValue: R): R
}

private[spark]
class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
extends AccumulableParam[R,T] {
private[spark] class
GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
extends AccumulableParam[R, T] {

def addAccumulator(growable: R, elem: T): R = {
growable += elem
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ class SparkContext(config: SparkConf) extends Logging {
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
@DeveloperApi
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
this(config)
this.preferredNodeLocationData = preferredNodeLocationData
@DeveloperApi
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
this(config)
this.preferredNodeLocationData = preferredNodeLocationData
}

/**
Expand Down Expand Up @@ -756,7 +756,7 @@ class SparkContext(config: SparkConf) extends Logging {
* Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*/
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
(initialValue: R): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R,T]
new Accumulable(initialValue, param)
Expand All @@ -767,7 +767,7 @@ class SparkContext(config: SparkConf) extends Logging {
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
*/
def broadcast[T](value: T): Broadcast[T] = {
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
Expand Down Expand Up @@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging {
if (SparkHadoopUtil.get.isYarnMode() &&
(master == "yarn-standalone" || master == "yarn-cluster")) {
// In order for this to work in yarn-cluster mode the user must specify the
// --addjars option to the client to upload the file into the distributed cache
// --addJars option to the client to upload the file into the distributed cache
// of the AM to make it show up in the current working directory.
val fileName = new Path(uri.getPath).getName()
try {
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ object SparkEnv extends Logging {
val jvmInformation = Seq(
("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)),
("Java Home", Properties.javaHome),
("Scala Version", Properties.versionString),
("Scala Home", Properties.scalaHome)
("Scala Version", Properties.versionString)
).sorted

// Spark properties
Expand Down
20 changes: 11 additions & 9 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@ import org.apache.spark.executor.TaskMetrics
*/
@DeveloperApi
class TaskContext(
val stageId: Int,
val partitionId: Int,
val attemptId: Long,
val runningLocally: Boolean = false,
@volatile var interrupted: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty
) extends Serializable {
val stageId: Int,
val partitionId: Int,
val attemptId: Long,
val runningLocally: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends Serializable {

@deprecated("use partitionId", "0.8.1")
def splitId = partitionId

// List of callback functions to execute when the task completes.
@transient private val onCompleteCallbacks = new ArrayBuffer[() => Unit]

// Set to true when the task is completed, before the onCompleteCallbacks are executed.
// Whether the corresponding task has been killed.
@volatile var interrupted: Boolean = false

// Whether the task has completed, before the onCompleteCallbacks are executed.
@volatile var completed: Boolean = false

/**
Expand All @@ -58,6 +60,6 @@ class TaskContext(
def executeOnCompleteCallbacks() {
completed = true
// Process complete callbacks in the reverse order of registration
onCompleteCallbacks.reverse.foreach{_()}
onCompleteCallbacks.reverse.foreach { _() }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
*/
def broadcast[T](value: T): Broadcast[T] = sc.broadcast(value)
def broadcast[T](value: T): Broadcast[T] = sc.broadcast(value)(fakeClassTag)

/** Shut down the SparkContext. */
def stop() {
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private[spark] class PythonRDD[T: ClassTag](
val obj = new Array[Byte](length)
stream.readFully(obj)
obj
case 0 => Array.empty[Byte]
case SpecialLengths.TIMING_DATA =>
// Timing data from worker
val bootTime = stream.readLong()
Expand Down Expand Up @@ -123,7 +124,7 @@ private[spark] class PythonRDD[T: ClassTag](
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
Array.empty[Byte]
null
}
} catch {

Expand All @@ -143,7 +144,7 @@ private[spark] class PythonRDD[T: ClassTag](

var _nextObj = read()

def hasNext = _nextObj.length != 0
def hasNext = _nextObj != null
}
new InterruptibleIterator(context, stdoutIterator)
}
Expand Down Expand Up @@ -179,18 +180,18 @@ private[spark] class PythonRDD[T: ClassTag](
dataOut.writeInt(split.index)
// sparkFilesDir
PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
// Python includes (*.zip and *.egg files)
dataOut.writeInt(pythonIncludes.length)
for (include <- pythonIncludes) {
PythonRDD.writeUTF(include, dataOut)
}
// Broadcast variables
dataOut.writeInt(broadcastVars.length)
for (broadcast <- broadcastVars) {
dataOut.writeLong(broadcast.id)
dataOut.writeInt(broadcast.value.length)
dataOut.write(broadcast.value)
}
// Python includes (*.zip and *.egg files)
dataOut.writeInt(pythonIncludes.length)
for (include <- pythonIncludes) {
PythonRDD.writeUTF(include, dataOut)
}
dataOut.flush()
// Serialized command:
dataOut.writeInt(command.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.Serializable

import org.apache.spark.SparkException

import scala.reflect.ClassTag

/**
* A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable
* cached on each machine rather than shipping a copy of it with tasks. They can be used, for
Expand Down Expand Up @@ -50,7 +52,7 @@ import org.apache.spark.SparkException
* @param id A unique identifier for the broadcast variable.
* @tparam T Type of the data contained in the broadcast variable.
*/
abstract class Broadcast[T](val id: Long) extends Serializable {
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {

/**
* Flag signifying whether the broadcast variable is valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.broadcast

import scala.reflect.ClassTag

import org.apache.spark.SecurityManager
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
Expand All @@ -31,7 +33,7 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
trait BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
def newBroadcast[T: ClassTag](value: T, isLocal: Boolean, id: Long): Broadcast[T]
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit
def stop(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.broadcast

import java.util.concurrent.atomic.AtomicLong

import scala.reflect.ClassTag

import org.apache.spark._

private[spark] class BroadcastManager(
Expand Down Expand Up @@ -56,7 +58,7 @@ private[spark] class BroadcastManager(

private val nextBroadcastId = new AtomicLong(0)

def newBroadcast[T](value_ : T, isLocal: Boolean) = {
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}

Expand Down
Loading

0 comments on commit 743a7ad

Please sign in to comment.