Skip to content

Commit

Permalink
[SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doe…
Browse files Browse the repository at this point in the history
…sn't work all the time

Improve it a bit to only do floor when it is not aligned relative to its zeroTime
  • Loading branch information
wesleymiao committed May 4, 2015
1 parent 2611745 commit 6ade399
Showing 1 changed file with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -762,16 +762,22 @@ abstract class DStream[T: ClassTag] (
if (!isInitialized) {
throw new SparkException(this + " has not been initialized")
}
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")

val alignedToTime = (toTime - zeroTime).isMultipleOf(slideDuration) match {
case true => toTime
case false =>
logWarning("toTime (" + toTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
toTime.floor(slideDuration, zeroTime)
}
if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("

val alignedFromTime = (fromTime - zeroTime).isMultipleOf(slideDuration) match {
case true => fromTime
case false =>
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
fromTime.floor(slideDuration, zeroTime)
}
val alignedToTime = toTime.floor(slideDuration, zeroTime)
val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

logInfo("Slicing from " + fromTime + " to " + toTime +
" (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
Expand Down

0 comments on commit 6ade399

Please sign in to comment.