Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-wang committed Apr 3, 2015
1 parent 07ce92f commit 925203b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ private[sql] class SQLConf extends Serializable {
getConf(AUTO_BROADCASTJOIN_THRESHOLD, (10 * 1024 * 1024).toString).toInt

/**
* By default it will choose sort merge join.
* By default not choose sort merge join.
*/
private[spark] def autoSortMergeJoin: Boolean =
getConf(AUTO_SORTMERGEJOIN, true.toString).toBoolean
getConf(AUTO_SORTMERGEJOIN, false.toString).toBoolean

/**
* The default size in bytes to assign to a logical operator's estimation statistics. By default,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,11 @@ case class SortMergeJoin(
stop = ordering.compare(leftKey, rightKey) == 0
}
if (rightMatches.size > 0) {
stop = false
leftMatches = new CompactBuffer[Row]()
val leftMatch = leftKey.copy()
while (!stop && leftElement != null) {
while (ordering.compare(leftKey, leftMatch) == 0 && leftElement != null) {
leftMatches += leftElement
fetchLeft()
// exit loop when run out of left matches
stop = ordering.compare(leftKey, leftMatch) != 0
}
}

Expand Down
16 changes: 13 additions & 3 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
case j: CartesianProduct => j
case j: BroadcastNestedLoopJoin => j
case j: BroadcastLeftSemiJoinHash => j
case j: ShuffledHashJoin => j
case j: SortMergeJoin => j
}

Expand All @@ -63,6 +64,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
test("join operator selection") {
cacheManager.clearCache()

val AUTO_SORTMERGEJOIN: Boolean = conf.autoSortMergeJoin
conf.setConf("spark.sql.autoSortMergeJoin", "false")
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[LeftSemiJoinHash]),
("SELECT * FROM testData LEFT SEMI JOIN testData2", classOf[LeftSemiJoinBNL]),
Expand All @@ -76,9 +79,9 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key = 2", classOf[CartesianProduct]),
("SELECT * FROM testData JOIN testData2 WHERE key > a", classOf[CartesianProduct]),
("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key > a", classOf[CartesianProduct]),
("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a", classOf[ShuffledHashJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[ShuffledHashJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[ShuffledHashJoin]),
("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[HashOuterJoin]),
("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
classOf[HashOuterJoin]),
Expand All @@ -92,6 +95,13 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
("SELECT * FROM testData full JOIN testData2 ON (key * a != key + a)",
classOf[BroadcastNestedLoopJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
conf.setConf("spark.sql.autoSortMergeJoin", "true")
Seq(
("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
conf.setConf("spark.sql.autoSortMergeJoin", AUTO_SORTMERGEJOIN.toString)
}

test("broadcasted hash join operator selection") {
Expand Down

0 comments on commit 925203b

Please sign in to comment.