Skip to content

Commit

Permalink
Merge branch 'PythonGmmWrapper', remote-tracking branch 'upstream/mas…
Browse files Browse the repository at this point in the history
…ter' into PythonGmmWrapper
  • Loading branch information
FlytxtRnD committed Feb 2, 2015
2 parents 2e14d82 + d85cd4e commit b22532c
Show file tree
Hide file tree
Showing 250 changed files with 10,637 additions and 3,295 deletions.
22 changes: 0 additions & 22 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@
</properties>

<dependencies>
<!-- Promote Guava to compile scope in this module so it's included while shading. -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -133,22 +127,6 @@
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.spark-project.guava</shadedPattern>
<includes>
<include>com.google.common.**</include>
</includes>
<excludes>
<exclude>com/google/common/base/Absent*</exclude>
<exclude>com/google/common/base/Function</exclude>
<exclude>com/google/common/base/Optional*</exclude>
<exclude>com/google/common/base/Present*</exclude>
<exclude>com/google/common/base/Supplier</exclude>
</excludes>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
Expand Down
4 changes: 2 additions & 2 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ install_app() {
# check if we already have the tarball
# check if we have curl installed
# download application
[ ! -f "${local_tarball}" ] && [ -n "`which curl 2>/dev/null`" ] && \
[ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \
echo "exec: curl ${curl_opts} ${remote_tarball}" && \
curl ${curl_opts} "${remote_tarball}" > "${local_tarball}"
# if the file still doesn't exist, lets try `wget` and cross our fingers
[ ! -f "${local_tarball}" ] && [ -n "`which wget 2>/dev/null`" ] && \
[ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \
echo "exec: wget ${wget_opts} ${remote_tarball}" && \
wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}"
# if both were unsuccessful, exit
Expand Down
4 changes: 2 additions & 2 deletions build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ acquire_sbt_jar () {
# Download
printf "Attempting to fetch sbt\n"
JAR_DL="${JAR}.part"
if hash curl 2>/dev/null; then
if [ $(command -v curl) ]; then
(curl --silent ${URL1} > "${JAR_DL}" || curl --silent ${URL2} > "${JAR_DL}") && mv "${JAR_DL}" "${JAR}"
elif hash wget 2>/dev/null; then
elif [ $(command -v wget) ]; then
(wget --quiet ${URL1} -O "${JAR_DL}" || wget --quiet ${URL2} -O "${JAR_DL}") && mv "${JAR_DL}" "${JAR}"
else
printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n"
Expand Down
1 change: 1 addition & 0 deletions conf/metrics.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
# period 10 Poll period
# unit seconds Units of poll period
# prefix EMPTY STRING Prefix to prepend to metric name
# protocol tcp Protocol ("tcp" or "udp") to use

## Examples
# Enable JmxSink for all instances by class name
Expand Down
60 changes: 8 additions & 52 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
<name>Spark Project Core</name>
<url>http://spark.apache.org/</url>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -106,16 +110,6 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<!--
Promote Guava to "compile" so that maven-shade-plugin picks it up (for packaging the Optional
class exposed in the Java API). The plugin will then remove this dependency from the published
pom, so that Guava does not pollute the client's compilation classpath.
-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down Expand Up @@ -204,19 +198,19 @@
<artifactId>stream</artifactId>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-json</artifactId>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>
<dependency>
Expand Down Expand Up @@ -350,44 +344,6 @@
<verbose>true</verbose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>com.google.guava:guava</include>
</includes>
</artifactSet>
<filters>
<!-- See comment in the guava dependency declaration above. -->
<filter>
<artifact>com.google.guava:guava</artifact>
<includes>
<include>com/google/common/base/Absent*</include>
<include>com/google/common/base/Function</include>
<include>com/google/common/base/Optional*</include>
<include>com/google/common/base/Present*</include>
<include>com/google/common/base/Supplier</include>
</includes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<!--
Copy guava to the build directory. This is needed to make the SPARK_PREPEND_CLASSES
option work in compute-classpath.sh, since it would put the non-shaded Spark classes in
the runtime classpath.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,3 @@ log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1986,7 +1986,7 @@ object SparkContext extends Logging {
case "yarn-client" =>
val scheduler = try {
val clazz =
Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
Class.forName("org.apache.spark.scheduler.cluster.YarnScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

Expand Down
37 changes: 37 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,19 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)

/**
* Reduces the elements of this RDD in a multi-level tree pattern.
*
* @param depth suggested depth of the tree
* @see [[org.apache.spark.api.java.JavaRDDLike#reduce]]
*/
def treeReduce(f: JFunction2[T, T, T], depth: Int): T = rdd.treeReduce(f, depth)

/**
* [[org.apache.spark.api.java.JavaRDDLike#treeReduce]] with suggested depth 2.
*/
def treeReduce(f: JFunction2[T, T, T]): T = treeReduce(f, 2)

/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
* given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
Expand All @@ -369,6 +382,30 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
combOp: JFunction2[U, U, U]): U =
rdd.aggregate(zeroValue)(seqOp, combOp)(fakeClassTag[U])

/**
* Aggregates the elements of this RDD in a multi-level tree pattern.
*
* @param depth suggested depth of the tree
* @see [[org.apache.spark.api.java.JavaRDDLike#aggregate]]
*/
def treeAggregate[U](
zeroValue: U,
seqOp: JFunction2[U, T, U],
combOp: JFunction2[U, U, U],
depth: Int): U = {
rdd.treeAggregate(zeroValue)(seqOp, combOp, depth)(fakeClassTag[U])
}

/**
* [[org.apache.spark.api.java.JavaRDDLike#treeAggregate]] with suggested depth 2.
*/
def treeAggregate[U](
zeroValue: U,
seqOp: JFunction2[U, T, U],
combOp: JFunction2[U, U, U]): U = {
treeAggregate(zeroValue, seqOp, combOp, 2)
}

/**
* Return the number of elements in the RDD.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
mapWritable.put(convertToWritable(k), convertToWritable(v))
}
mapWritable
case array: Array[Any] => {
val arrayWriteable = new ArrayWritable(classOf[Writable])
arrayWriteable.set(array.map(convertToWritable(_)))
arrayWriteable
}
case other => throw new SparkException(
s"Data of type ${other.getClass.getName} cannot be used")
}
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,16 @@ private[spark] class PythonRDD(
envVars += ("SPARK_REUSE_WORKER" -> "1")
}
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
// Whether is the worker released into idle pool
@volatile var released = false

// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)

var complete_cleanly = false
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
writerThread.join()
if (reuse_worker && complete_cleanly) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
} else {
if (!reuse_worker || !released) {
try {
worker.close()
} catch {
Expand Down Expand Up @@ -145,8 +144,12 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
// Check whether the worker is ready to be re-used.
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
complete_cleanly = true
if (reuse_worker) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
released = true
}
}
null
}
Expand Down
18 changes: 13 additions & 5 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ private[spark] object SerDeUtil extends Logging {
iter.flatMap { row =>
val obj = unpickle.loads(row)
if (batched) {
obj.asInstanceOf[JArrayList[_]].asScala
obj match {
case array: Array[Any] => array.toSeq
case _ => obj.asInstanceOf[JArrayList[_]].asScala
}
} else {
Seq(obj)
}
Expand Down Expand Up @@ -199,7 +202,10 @@ private[spark] object SerDeUtil extends Logging {
* representation is serialized
*/
def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = {
val (keyFailed, valueFailed) = checkPickle(rdd.first())
val (keyFailed, valueFailed) = rdd.take(1) match {
case Array() => (false, false)
case Array(first) => checkPickle(first)
}

rdd.mapPartitions { iter =>
val cleaned = iter.map { case (k, v) =>
Expand All @@ -226,10 +232,12 @@ private[spark] object SerDeUtil extends Logging {
}

val rdd = pythonToJava(pyRDD, batched).rdd
rdd.first match {
case obj if isPair(obj) =>
rdd.take(1) match {
case Array(obj) if isPair(obj) =>
// we only accept (K, V)
case other => throw new SparkException(
case Array() =>
// we also accept empty collections
case Array(other) => throw new SparkException(
s"RDD element of type ${other.getClass.getName} cannot be used")
}
rdd.map { obj =>
Expand Down
16 changes: 6 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,9 @@ class SparkHadoopUtil extends Logging {
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
*/
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = {
try {
val threadStats = getFileSystemThreadStatistics(path, conf)
val threadStats = getFileSystemThreadStatistics()
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesRead = f()
Expand All @@ -156,10 +155,9 @@ class SparkHadoopUtil extends Logging {
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
*/
private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
private[spark] def getFSBytesWrittenOnThreadCallback(): Option[() => Long] = {
try {
val threadStats = getFileSystemThreadStatistics(path, conf)
val threadStats = getFileSystemThreadStatistics()
val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesWritten = f()
Expand All @@ -172,10 +170,8 @@ class SparkHadoopUtil extends Logging {
}
}

private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
val scheme = qualifiedPath.toUri().getScheme()
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
private def getFileSystemThreadStatistics(): Seq[AnyRef] = {
val stats = FileSystem.getAllStatistics()
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
}

Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ object SparkSubmit {
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
case _ =>
}

Expand Down Expand Up @@ -408,6 +410,13 @@ object SparkSubmit {
mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
}

/**
* Return whether the given main class represents a thrift server.
*/
private[spark] def isThriftServer(mainClass: String): Boolean = {
mainClass == "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
}

/**
* Return whether the given primary resource requires running python.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ private[spark] class Executor(
}

val executorSource = new ExecutorSource(this, executorId)
conf.set("spark.executor.id", executorId)

if (!isLocal) {
env.metricsSystem.registerSource(executorSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.executor

import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.executor.DataReadMethod
import org.apache.spark.executor.DataReadMethod.DataReadMethod

import scala.collection.mutable.ArrayBuffer
Expand Down
Loading

0 comments on commit b22532c

Please sign in to comment.