Skip to content

Commit

Permalink
Merge pull request #1 from apache/master
Browse files Browse the repository at this point in the history
merging upstream updates
  • Loading branch information
nchammas committed Jun 10, 2014
2 parents 41c4a33 + 1abbde0 commit 69da6cf
Show file tree
Hide file tree
Showing 86 changed files with 2,564 additions and 239 deletions.
4 changes: 2 additions & 2 deletions bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
test("large number of iterations") {
// This tests whether jobs with a large number of iterations finish in a reasonable time,
// because non-memoized recursion in RDD or DAGScheduler used to cause them to hang
failAfter(10 seconds) {
failAfter(30 seconds) {
sc = new SparkContext("local", "test")
val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
val msgs = sc.parallelize(Array[(String, TestMessage)]())
Expand All @@ -101,7 +101,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
sc = new SparkContext("local", "test")
val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
val msgs = sc.parallelize(Array[(String, TestMessage)]())
val numSupersteps = 50
val numSupersteps = 20
val result =
Bagel.run(sc, verts, msgs, sc.defaultParallelism, StorageLevel.DISK_ONLY) {
(self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
Expand Down
6 changes: 5 additions & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ else
if [[ "$IPYTHON" = "1" ]]; then
exec ipython $IPYTHON_OPTS
else
exec "$PYSPARK_PYTHON"
if [[ -n $SPARK_TESTING ]]; then
exec "$PYSPARK_PYTHON" -m doctest
else
exec "$PYSPARK_PYTHON"
fi
fi
fi
2 changes: 1 addition & 1 deletion bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
fi

./bin/spark-submit \
"$FWDIR"/bin/spark-submit \
--master $EXAMPLE_MASTER \
--class $EXAMPLE_CLASS \
"$SPARK_EXAMPLES_JAR" \
Expand Down
20 changes: 19 additions & 1 deletion core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,17 @@ class HashPartitioner(partitions: Int) extends Partitioner {
case _ =>
false
}

override def hashCode: Int = numPartitions
}

/**
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
*
* Note that the actual number of partitions created by the RangePartitioner might not be the same
* as the `partitions` parameter, in the case where the number of sampled records is less than
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
Expand Down Expand Up @@ -119,7 +125,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}
}

def numPartitions = partitions
def numPartitions = rangeBounds.length + 1

private val binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

Expand Down Expand Up @@ -155,4 +161,16 @@ class RangePartitioner[K : Ordering : ClassTag, V](
case _ =>
false
}

override def hashCode(): Int = {
val prime = 31
var result = 1
var i = 0
while (i < rangeBounds.length) {
result = prime * result + rangeBounds(i).hashCode
i += 1
}
result = prime * result + ascending.hashCode
result
}
}
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
minPartitions).map(pair => pair._2.toString).setName(path)
}

/**
Expand Down Expand Up @@ -496,7 +496,7 @@ class SparkContext(config: SparkConf) extends Logging {
classOf[String],
classOf[String],
updateConf,
minPartitions)
minPartitions).setName(path)
}

/**
Expand Down Expand Up @@ -551,7 +551,7 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass,
keyClass,
valueClass,
minPartitions)
minPartitions).setName(path)
}

/**
Expand Down Expand Up @@ -623,7 +623,7 @@ class SparkContext(config: SparkConf) extends Logging {
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}

/**
Expand Down
129 changes: 129 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.python

import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
import org.apache.spark.annotation.Experimental


/**
* :: Experimental ::
* A trait for use with reading custom classes in PySpark. Implement this trait and add custom
* transformation code by overriding the convert method.
*/
@Experimental
trait Converter[T, U] extends Serializable {
def convert(obj: T): U
}

private[python] object Converter extends Logging {

def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
logInfo(s"Loaded converter: $cc")
c
} match {
case Success(c) => c
case Failure(err) =>
logError(s"Failed to load converter: $cc")
throw err
}
}.getOrElse { new DefaultConverter }
}
}

/**
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
* Other objects are passed through without conversion.
*/
private[python] class DefaultConverter extends Converter[Any, Any] {

/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
* object representation
*/
private def convertWritable(writable: Writable): Any = {
import collection.JavaConversions._
writable match {
case iw: IntWritable => iw.get()
case dw: DoubleWritable => dw.get()
case lw: LongWritable => lw.get()
case fw: FloatWritable => fw.get()
case t: Text => t.toString
case bw: BooleanWritable => bw.get()
case byw: BytesWritable => byw.getBytes
case n: NullWritable => null
case aw: ArrayWritable => aw.get().map(convertWritable(_))
case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
(convertWritable(k), convertWritable(v))
}.toMap)
case other => other
}
}

def convert(obj: Any): Any = {
obj match {
case writable: Writable =>
convertWritable(writable)
case _ =>
obj
}
}
}

/** Utilities for working with Python objects <-> Hadoop-related objects */
private[python] object PythonHadoopUtil {

/**
* Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]]
*/
def mapToConf(map: java.util.Map[String, String]): Configuration = {
import collection.JavaConversions._
val conf = new Configuration()
map.foreach{ case (k, v) => conf.set(k, v) }
conf
}

/**
* Merges two configurations, returns a copy of left with keys from right overwriting
* any matching keys in left
*/
def mergeConfs(left: Configuration, right: Configuration): Configuration = {
import collection.JavaConversions._
val copy = new Configuration(left)
right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue))
copy
}

/**
* Converts an RDD of key-value pairs, where key and/or value could be instances of
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
*/
def convertRDD[K, V](rdd: RDD[(K, V)],
keyConverter: Converter[Any, Any],
valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,6 @@ private[spark] class PythonPartitioner(
case _ =>
false
}

override def hashCode: Int = 31 * numPartitions + pyPartitionFunctionId.hashCode
}
Loading

0 comments on commit 69da6cf

Please sign in to comment.