Skip to content

Commit

Permalink
[SQL] Code Cleanup: Left Semi Hash Join
Browse files Browse the repository at this point in the history
Some improvement for PR #837, add another case to white list and use `filter` to build result iterator.

Author: Daoyuan <daoyuan.wang@intel.com>

Closes #1049 from adrian-wang/clean-LeftSemiJoinHash and squashes the following commits:

b314d5a [Daoyuan] change hashSet name
27579a9 [Daoyuan] add semijoin to white list and use filter to create new iterator in LeftSemiJoinBNL

Signed-off-by: Michael Armbrust <michael@databricks.com>
  • Loading branch information
adrian-wang authored and marmbrus committed Jun 11, 2014
1 parent 4107cce commit ce6deb1
Show file tree
Hide file tree
Showing 52 changed files with 374 additions and 33 deletions.
40 changes: 7 additions & 33 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,51 +169,25 @@ case class LeftSemiJoinHash(
def execute() = {

buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
val hashTable = new java.util.HashSet[Row]()
val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null

// Create a Hash set of buildKeys
while (buildIter.hasNext) {
currentRow = buildIter.next()
val rowKey = buildSideKeyGenerator(currentRow)
if(!rowKey.anyNull) {
val keyExists = hashTable.contains(rowKey)
val keyExists = hashSet.contains(rowKey)
if (!keyExists) {
hashTable.add(rowKey)
hashSet.add(rowKey)
}
}
}

new Iterator[Row] {
private[this] var currentStreamedRow: Row = _
private[this] var currentHashMatched: Boolean = false

private[this] val joinKeys = streamSideKeyGenerator()

override final def hasNext: Boolean =
streamIter.hasNext && fetchNext()

override final def next() = {
currentStreamedRow
}

/**
* Searches the streamed iterator for the next row that has at least one match in hashtable.
*
* @return true if the search is successful, and false the streamed iterator runs out of
* tuples.
*/
private final def fetchNext(): Boolean = {
currentHashMatched = false
while (!currentHashMatched && streamIter.hasNext) {
currentStreamedRow = streamIter.next()
if (!joinKeys(currentStreamedRow).anyNull) {
currentHashMatched = hashTable.contains(joinKeys.currentValue)
}
}
currentHashMatched
}
}
val joinKeys = streamSideKeyGenerator()
streamIter.filter(current => {
!joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)
})
}
}
}
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
0 val_0
0 val_0
0 val_0
2 val_2
4 val_4
5 val_5
5 val_5
5 val_5
8 val_8
9 val_9
10 val_10
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
0 val_0
0 val_0
0 val_0
4 val_2
8 val_4
10 val_5
10 val_5
10 val_5
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
val_0
val_0
val_0
val_10
val_2
val_4
val_5
val_5
val_5
val_8
val_9
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
0 val_0
0 val_0
0 val_0
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
val_10
val_8
val_9
Empty file.
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
4 val_2
8 val_4
10 val_5
10 val_5
10 val_5
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
0
0
0
0
0
0
2
4
4
5
5
5
8
8
9
10
10
10
10
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
0 val_0
0 val_0
0 val_0
8 val_8
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
0 val_0 0 val_0
4 val_4 4 val_2
8 val_8 8 val_4
10 val_10 10 val_5
10 val_10 10 val_5
10 val_10 10 val_5
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
0 val_0
0 val_0
0 val_0
4 val_2
8 val_4
10 val_5
10 val_5
10 val_5
16 val_8
18 val_9
20 val_10
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
0 val_0
0 val_0
0 val_0
0 val_0
0 val_0
0 val_0
2 val_2
4 val_4
5 val_5
5 val_5
5 val_5
8 val_8
9 val_9
10 val_10
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
0
0
0
0
0
0
4
4
8
8
10
10
10
10
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
4
8
8
10
10
10
10
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
NULL
NULL
NULL
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
4
8
8
10
10
10
10
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
NULL
NULL
NULL
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
4
8
8
10
10
10
10
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
4
8
8
10
10
10
10
16
18
20
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
NULL
NULL
NULL
NULL
NULL
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
4
4
8
8
10
10
10
10
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
NULL
NULL
NULL
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
2
4
4
5
5
5
8
8
9
10
10
10
10
10
10
10
10
10
10
10
10
Empty file.
Loading

0 comments on commit ce6deb1

Please sign in to comment.