Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into pyspark-submit
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed May 16, 2014
2 parents a823661 + 032d663 commit 05879fa
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class Worker(
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3

val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true)
val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
// How often worker will clean up old app folders
val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.util.ByteBufferInputStream
* - [[org.apache.spark.scheduler.ResultTask]]
*
* A Spark job consists of one or more stages. The very last stage in a job consists of multiple
* ResultTask's, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task
* ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task
* and sends the task output back to the driver application. A ShuffleMapTask executes the task
* and divides the task output to multiple buckets (based on the task's partitioner).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[spark] class TaskSchedulerImpl(
SchedulingMode.withName(schedulingModeConf.toUpperCase)
} catch {
case e: java.util.NoSuchElementException =>
throw new SparkException(s"Urecognized spark.scheduler.mode: $schedulingModeConf")
throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")
}

// This is a var so that we can reset it for testing purposes.
Expand Down
5 changes: 3 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,11 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td>spark.worker.cleanup.enabled</td>
<td>true</td>
<td>false</td>
<td>
Enable periodic cleanup of worker / application directories. Note that this only affects standalone
mode, as YARN works differently.
mode, as YARN works differently. Applications directories are cleaned up regardless of whether
the application is still running.
</td>
</tr>
<tr>
Expand Down
10 changes: 8 additions & 2 deletions graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]

object Edge {
private[graphx] def lexicographicOrdering[ED] = new Ordering[Edge[ED]] {
override def compare(a: Edge[ED], b: Edge[ED]): Int =
(if (a.srcId != b.srcId) a.srcId - b.srcId else a.dstId - b.dstId).toInt
override def compare(a: Edge[ED], b: Edge[ED]): Int = {
if (a.srcId == b.srcId) {
if (a.dstId == b.dstId) 0
else if (a.dstId < b.dstId) -1
else 1
} else if (a.srcId < b.srcId) -1
else 1
}
}
}
39 changes: 39 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/EdgeSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx

import org.scalatest.FunSuite

class EdgeSuite extends FunSuite {
test ("compare") {
// decending order
val testEdges: Array[Edge[Int]] = Array(
Edge(0x7FEDCBA987654321L, -0x7FEDCBA987654321L, 1),
Edge(0x2345L, 0x1234L, 1),
Edge(0x1234L, 0x5678L, 1),
Edge(0x1234L, 0x2345L, 1),
Edge(-0x7FEDCBA987654321L, 0x7FEDCBA987654321L, 1)
)
// to ascending order
val sortedEdges = testEdges.sorted(Edge.lexicographicOrdering[Int])

for (i <- 0 until testEdges.length) {
assert(sortedEdges(i) == testEdges(testEdges.length - i - 1))
}
}
}
35 changes: 34 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
<protobuf.version>2.4.1</protobuf.version>
<yarn.version>${hadoop.version}</yarn.version>
<hbase.version>0.94.6</hbase.version>
<zookeeper.version>3.4.5</zookeeper.version>
<hive.version>0.12.0</hive.version>
<parquet.version>1.4.3</parquet.version>
<jblas.version>1.2.3</jblas.version>
Expand Down Expand Up @@ -194,6 +195,17 @@
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>mapr-repo</id>
<name>MapR Repository</name>
<url>http://repository.mapr.com/maven</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<dependencyManagement>
Expand Down Expand Up @@ -490,6 +502,14 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -979,6 +999,19 @@
</modules>
</profile>

<profile>
<id>mapr</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<hadoop.version>1.0.3-mapr-3.0.3</hadoop.version>
<yarn.version>2.3.0-mapr-4.0.0-beta</yarn.version>
<hbase.version>0.94.17-mapr-1403</hbase.version>
<zookeeper.version>3.4.5-mapr-1401</zookeeper.version>
</properties>
</profile>

<!-- Build without Hadoop dependencies that are included in some runtime environments. -->
<profile>
<id>hadoop-provided</id>
Expand Down Expand Up @@ -1024,7 +1057,7 @@
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
<version>${zookeeper.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,19 @@ case class And(left: Expression, right: Expression) extends BinaryPredicate {

override def eval(input: Row): Any = {
val l = left.eval(input)
val r = right.eval(input)
if (l == false || r == false) {
false
} else if (l == null || r == null ) {
null
if (l == false) {
false
} else {
true
val r = right.eval(input)
if (r == false) {
false
} else {
if (l != null && r != null) {
true
} else {
null
}
}
}
}
}
Expand All @@ -114,13 +120,19 @@ case class Or(left: Expression, right: Expression) extends BinaryPredicate {

override def eval(input: Row): Any = {
val l = left.eval(input)
val r = right.eval(input)
if (l == true || r == true) {
if (l == true) {
true
} else if (l == null || r == null) {
null
} else {
false
val r = right.eval(input)
if (r == true) {
true
} else {
if (l != null && r != null) {
false
} else {
null
}
}
}
}
}
Expand All @@ -133,8 +145,12 @@ case class Equals(left: Expression, right: Expression) extends BinaryComparison
def symbol = "="
override def eval(input: Row): Any = {
val l = left.eval(input)
val r = right.eval(input)
if (l == null || r == null) null else l == r
if (l == null) {
null
} else {
val r = right.eval(input)
if (r == null) null else l == r
}
}
}

Expand Down Expand Up @@ -162,7 +178,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
extends Expression {

def children = predicate :: trueValue :: falseValue :: Nil
def nullable = trueValue.nullable || falseValue.nullable
override def nullable = trueValue.nullable || falseValue.nullable
def references = children.flatMap(_.references).toSet
override lazy val resolved = childrenResolved && trueValue.dataType == falseValue.dataType
def dataType = {
Expand All @@ -175,8 +191,9 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
}

type EvaluatedType = Any

override def eval(input: Row): Any = {
if (predicate.eval(input).asInstanceOf[Boolean]) {
if (true == predicate.eval(input)) {
trueValue.eval(input)
} else {
falseValue.eval(input)
Expand Down
16 changes: 16 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ private[hive] object HiveQl {
}
} catch {
case e: Exception => throw new ParseException(sql, e)
case e: NotImplementedError => sys.error(
s"""
|Unsupported language features in query: $sql
|${dumpTree(getAst(sql))}
""".stripMargin)
}
}

Expand Down Expand Up @@ -865,6 +870,17 @@ private[hive] object HiveQl {
IsNull(nodeToExpr(child))
case Token("TOK_FUNCTION", Token("IN", Nil) :: value :: list) =>
In(nodeToExpr(value), list.map(nodeToExpr))
case Token("TOK_FUNCTION",
Token("between", Nil) ::
Token("KW_FALSE", Nil) ::
target ::
minValue ::
maxValue :: Nil) =>

val targetExpression = nodeToExpr(target)
And(
GreaterThanOrEqual(targetExpression, nodeToExpr(minValue)),
LessThanOrEqual(targetExpression, nodeToExpr(maxValue)))

/* Boolean Logic */
case Token(AND(), left :: right:: Nil) => And(nodeToExpr(left), nodeToExpr(right))
Expand Down
28 changes: 21 additions & 7 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,31 @@ private[hive] case class HiveGenericUdf(name: String, children: Seq[Expression])
isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable)
}

protected lazy val deferedObjects = Array.fill[DeferredObject](children.length)({
new DeferredObjectAdapter
})

// Adapter from Catalyst ExpressionResult to Hive DeferredObject
class DeferredObjectAdapter extends DeferredObject {
private var func: () => Any = _
def set(func: () => Any) {
this.func = func
}
override def prepare(i: Int) = {}
override def get(): AnyRef = wrap(func())
}

val dataType: DataType = inspectorToDataType(returnInspector)

override def eval(input: Row): Any = {
returnInspector // Make sure initialized.
val args = children.map { v =>
new DeferredObject {
override def prepare(i: Int) = {}
override def get(): AnyRef = wrap(v.eval(input))
}
}.toArray
unwrap(function.evaluate(args))
var i = 0
while (i < children.length) {
val idx = i
deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set(() => {children(idx).eval(input)})
i += 1
}
unwrap(function.evaluate(deferedObjects))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2 val_2
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import org.apache.spark.sql.hive.test.TestHive._
*/
class HiveQuerySuite extends HiveComparisonTest {

createQueryTest("between",
"SELECT * FROM src WHERE key between 1 and 2"
)

test("Query expressed in SQL") {
assert(sql("SELECT 1").collect() === Array(Seq(1)))
}
Expand Down

0 comments on commit 05879fa

Please sign in to comment.