diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index b606c236e3234..96078de689388 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -762,16 +762,22 @@ abstract class DStream[T: ClassTag] ( if (!isInitialized) { throw new SparkException(this + " has not been initialized") } - if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" - + slideDuration + ")") + + val alignedToTime = (toTime - zeroTime).isMultipleOf(slideDuration) match { + case true => toTime + case false => + logWarning("toTime (" + toTime + ") is not a multiple of slideDuration (" + + slideDuration + ")") + toTime.floor(slideDuration, zeroTime) } - if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + + val alignedFromTime = (fromTime - zeroTime).isMultipleOf(slideDuration) match { + case true => fromTime + case false => + logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") + fromTime.floor(slideDuration, zeroTime) } - val alignedToTime = toTime.floor(slideDuration, zeroTime) - val alignedFromTime = fromTime.floor(slideDuration, zeroTime) logInfo("Slicing from " + fromTime + " to " + toTime + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")