Skip to content

Commit

Permalink
Push null check into buffered iterator next().
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Aug 9, 2015
1 parent 7d3cc5d commit f83b412
Showing 1 changed file with 19 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ private[joins] class SortMergeJoinScanner(
private[this] var streamedRow: InternalRow = _
private[this] var streamedRowKey: InternalRow = _
private[this] var bufferedRow: InternalRow = _
// Note: this is guaranteed to never have any null columns:
private[this] var bufferedRowKey: InternalRow = _
/**
* The join key for the rows buffered in `bufferedMatches`, or null if `bufferedMatches` is empty
Expand All @@ -157,7 +158,7 @@ private[joins] class SortMergeJoinScanner(
private[this] val bufferedMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]

// Initialization (note: do _not_ want to advance streamed here).
advancedBuffered()
advancedBufferedToRowWithNullFreeJoinKey()

// --- Public methods ---------------------------------------------------------------------------

Expand Down Expand Up @@ -196,11 +197,10 @@ private[joins] class SortMergeJoinScanner(
do {
if (streamedRowKey.anyNull) {
advancedStreamed()
} else if (bufferedRowKey.anyNull) {
advancedBuffered()
} else {
assert(!bufferedRowKey.anyNull)
comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
if (comp > 0) advancedBuffered()
if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
else if (comp < 0) advancedStreamed()
}
} while (streamedRow != null && bufferedRow != null && comp != 0)
Expand Down Expand Up @@ -242,15 +242,10 @@ private[joins] class SortMergeJoinScanner(
if (bufferedRow != null && !streamedRowKey.anyNull) {
// The buffered iterator could still contain matching rows, so we'll need to walk through
// it until we either find matches or pass where they would be found.
var comp =
if (bufferedRowKey.anyNull) 1 else keyOrdering.compare(streamedRowKey, bufferedRowKey)
while (comp > 0 && advancedBuffered()) {
comp = if (bufferedRowKey.anyNull) {
1
} else {
keyOrdering.compare(streamedRowKey, bufferedRowKey)
}
}
var comp = 1
do {
comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
} while (comp > 0 && advancedBufferedToRowWithNullFreeJoinKey())
if (comp == 0) {
// We have found matches, so buffer them (this updates matchJoinKey)
bufferMatchingRows()
Expand Down Expand Up @@ -283,18 +278,22 @@ private[joins] class SortMergeJoinScanner(
}

/**
* Advance the buffered iterator and compute the new row's join key.
* Advance the buffered iterator until we find a row with join key that does not contain nulls.
* @return true if the buffered iterator returned a row and false otherwise.
*/
private def advancedBuffered(): Boolean = {
if (bufferedIter.advanceNext()) {
private def advancedBufferedToRowWithNullFreeJoinKey(): Boolean = {
var foundRow: Boolean = false
while (!foundRow && bufferedIter.advanceNext()) {
bufferedRow = bufferedIter.getRow
bufferedRowKey = bufferedKeyGenerator(bufferedRow)
true
} else {
foundRow = !bufferedRowKey.anyNull
}
if (!foundRow) {
bufferedRow = null
bufferedRowKey = null
false
} else {
true
}
}

Expand All @@ -312,11 +311,7 @@ private[joins] class SortMergeJoinScanner(
bufferedMatches.clear()
do {
bufferedMatches += bufferedRow.copy() // need to copy mutable rows before buffering them
advancedBuffered()
} while (
bufferedRow != null &&
!bufferedRowKey.anyNull &&
keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0
)
advancedBufferedToRowWithNullFreeJoinKey()
} while (bufferedRow != null && keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0)
}
}

0 comments on commit f83b412

Please sign in to comment.