Skip to content

Commit

Permalink
[SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doe…
Browse files Browse the repository at this point in the history
…sn't work all the time

The problem most likely resides in DStream.slice() implementation, as shown below.

  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
    if (!isInitialized) {
      throw new SparkException(this + " has not been initialized")
    }
    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

    logInfo("Slicing from " + fromTime + " to " + toTime +
      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
      if (time >= zeroTime) getOrCompute(time) else None
    })
  }

Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.

The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :

  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }

And then change the DStream.slice to call this new floor function by passing in its zeroTime.

    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0.
  • Loading branch information
wesleymiao committed May 3, 2015
1 parent ea841ef commit 2611745
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ case class Time(private val millis: Long) {
new Time((this.millis / t) * t)
}

def floor(that: Duration, zeroTime: Time): Time = {
val t = that.milliseconds
new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
}

def isMultipleOf(that: Duration): Boolean =
(this.millis % that.milliseconds == 0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,8 @@ abstract class DStream[T: ClassTag] (
logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
}
val alignedToTime = toTime.floor(slideDuration)
val alignedFromTime = fromTime.floor(slideDuration)
val alignedToTime = toTime.floor(slideDuration, zeroTime)
val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

logInfo("Slicing from " + fromTime + " to " + toTime +
" (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
Expand Down

0 comments on commit 2611745

Please sign in to comment.