Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time #5871

Closed
wants to merge 4 commits into from

Conversation

wesleymiao
Copy link
Contributor

@tdas

https://issues.apache.org/jira/browse/SPARK-7326

The problem most likely resides in DStream.slice() implementation, as shown below.

def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
if (!isInitialized) {
throw new SparkException(this + " has not been initialized")
}
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
}
if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
}
val alignedToTime = toTime.floor(slideDuration, zeroTime)
val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

logInfo("Slicing from " + fromTime + " to " + toTime +
  " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
  if (time >= zeroTime) getOrCompute(time) else None
})

}

Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.

The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :

def floor(that: Duration, zeroTime: Time): Time = {
val t = that.milliseconds
new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
}

And then change the DStream.slice to call this new floor function by passing in its zeroTime.

val alignedToTime = toTime.floor(slideDuration, zeroTime)
val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

This way the alignedToTime and alignedFromTime are really aligned in respect to zeroTime whose value is not really a 0.

…sn't work all the time

The problem most likely resides in DStream.slice() implementation, as shown below.

  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
    if (!isInitialized) {
      throw new SparkException(this + " has not been initialized")
    }
    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

    logInfo("Slicing from " + fromTime + " to " + toTime +
      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
      if (time >= zeroTime) getOrCompute(time) else None
    })
  }

Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.

The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :

  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }

And then change the DStream.slice to call this new floor function by passing in its zeroTime.

    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0.
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@srowen
Copy link
Member

srowen commented May 3, 2015

The intent of the code does seem to be that the difference from zeroTime is what is supposed to be a multiple of slideDuration. From my understanding I agree with your change. Let's see what the tests say.

@srowen
Copy link
Member

srowen commented May 3, 2015

ok to test

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 3, 2015

Test build #31691 has started for PR 5871 at commit 2611745.

@SparkQA
Copy link

SparkQA commented May 3, 2015

Test build #31691 has finished for PR 5871 at commit 2611745.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented May 3, 2015

You will not directly call slice(). When slice() is called internally, e.g. in WindowedDStream.compute(), the Interval given as parameter to slice is valid. So if it is not trying to directly call slice, calling floor() to align fromTime and toTime is unnecessary.

As you pointed, the current floor is buggy. That is why your example runs incorrectly. In fact, just commenting out two floor callings makes your example passed.

Is it better to only perform alignment when the isMultipleOf checking is failed?

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31691/
Test PASSed.

…sn't work all the time

Improve it a bit to only do floor when it is not aligned relative to its zeroTime
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 4, 2015

Test build #31715 has started for PR 5871 at commit 6ade399.

@wesleymiao
Copy link
Contributor Author

@viirya , you are right. I've changed a little bit to only perform floor when the fromTime and toTime are not aligned relative to zeroTime.

Also fixed a typo in the log message.

@SparkQA
Copy link

SparkQA commented May 4, 2015

Test build #31715 has finished for PR 5871 at commit 6ade399.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31715/
Test PASSed.

logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")

val alignedToTime = (toTime - zeroTime).isMultipleOf(slideDuration) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think match syntax for booleans is needlessly complicated. You can just write a more straightforward if.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to if-else.

…sn't work all the time

In response to comment, change match-case to if-else for boolean to be consistent.
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 4, 2015

Test build #31730 has started for PR 5871 at commit 48b4dc0.

@SparkQA
Copy link

SparkQA commented May 4, 2015

Test build #31730 has finished for PR 5871 at commit 48b4dc0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31730/
Test PASSed.

@@ -63,6 +63,11 @@ case class Time(private val millis: Long) {
new Time((this.millis / t) * t)
}

def floor(that: Duration, zeroTime: Time): Time = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add some tests for this new method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be made public? If so, would be good to add some docs.

@srowen
Copy link
Member

srowen commented May 8, 2015

A little test would be great, yeah. Would still love a quick look from @tdas to make sure we're not missing something.

…en't work all the time

Add some tests to cover the new function Time.floor(duration, zeroTime)
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 8, 2015

Test build #32230 has started for PR 5871 at commit 82a4d8c.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32230/
Test FAILed.

@shaneknapp
Copy link
Contributor

i'll retrigger this job once i restart jenkins

@shaneknapp
Copy link
Contributor

jenkins, test this please

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 8, 2015

Test build #32237 has started for PR 5871 at commit 82a4d8c.

@SparkQA
Copy link

SparkQA commented May 8, 2015

Test build #32237 has finished for PR 5871 at commit 82a4d8c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32237/
Test PASSed.

@wesleymiao
Copy link
Contributor Author

Hi, @tdas , will you be able to take a look at this PR? My team in Autodesk is trying to adopt Spark Streaming to do some real-time analytics for our services' API logs to serve various purposes, among which is to monitor our services' health in real-time based on API log statistics. Then we run into this situation where we need to perform reduceByKeyAndWindow on an already windowed DStream.

What I'd like to achieve is to do multiple-level reduce on the source of API logging stream. For example - let's say the source logging stream is at interval 1 second. The first level aggregation would be every 1 minute, which is the 60 intervals of the source stream. The second level would be every 1 hour. The third level would be 1 day. And we can do more levels if we want.

What I hope is that at each level we'll do reduceByKeyAndWindow so that its aggregation can be done over its immediate previous level, instead of always aggregating over the source stream. Level 3's reduceByKey will be based on level 2's result, level 2 is based one level 1 and level 1 is based on the source stream.

I would think this approach will be more efficient than always reducing over the source stream, particularly for the higher level (like daily and weekly) aggregation.

@srowen
Copy link
Member

srowen commented May 11, 2015

I'm proceeding with this since everywhere in the code it appears that the intent is for RDDs to start an end on a multiple of the slide duration after zero time. Therefore it seems like the intent must have been to take into account zeroTime.

asfgit pushed a commit that referenced this pull request May 11, 2015
…sn't work all the time

tdas

https://issues.apache.org/jira/browse/SPARK-7326

The problem most likely resides in DStream.slice() implementation, as shown below.

  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
    if (!isInitialized) {
      throw new SparkException(this + " has not been initialized")
    }
    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

    logInfo("Slicing from " + fromTime + " to " + toTime +
      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
      if (time >= zeroTime) getOrCompute(time) else None
    })
  }

Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.

The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :

  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }

And then change the DStream.slice to call this new floor function by passing in its zeroTime.

    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0.

Author: Wesley Miao <wesley.miao@gmail.com>
Author: Wesley <wesley.miao@autodesk.com>

Closes #5871 from wesleymiao/spark-7326 and squashes the following commits:

82a4d8c [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream dosen't work all the time
48b4dc0 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
6ade399 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
2611745 [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time

(cherry picked from commit d70a076)
Signed-off-by: Sean Owen <sowen@cloudera.com>
@asfgit asfgit closed this in d70a076 May 11, 2015
fromTime
} else {
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect indentation.

@tdas
Copy link
Contributor

tdas commented May 11, 2015

Aah, this has already been merged before I got to it. That's okay, barring a few nits, it was good. Thanks @wesleymiao @srowen

jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
…sn't work all the time

tdas

https://issues.apache.org/jira/browse/SPARK-7326

The problem most likely resides in DStream.slice() implementation, as shown below.

  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
    if (!isInitialized) {
      throw new SparkException(this + " has not been initialized")
    }
    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

    logInfo("Slicing from " + fromTime + " to " + toTime +
      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
      if (time >= zeroTime) getOrCompute(time) else None
    })
  }

Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.

The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :

  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }

And then change the DStream.slice to call this new floor function by passing in its zeroTime.

    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0.

Author: Wesley Miao <wesley.miao@gmail.com>
Author: Wesley <wesley.miao@autodesk.com>

Closes apache#5871 from wesleymiao/spark-7326 and squashes the following commits:

82a4d8c [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream dosen't work all the time
48b4dc0 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
6ade399 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
2611745 [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
…sn't work all the time

tdas

https://issues.apache.org/jira/browse/SPARK-7326

The problem most likely resides in DStream.slice() implementation, as shown below.

  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
    if (!isInitialized) {
      throw new SparkException(this + " has not been initialized")
    }
    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

    logInfo("Slicing from " + fromTime + " to " + toTime +
      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
      if (time >= zeroTime) getOrCompute(time) else None
    })
  }

Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.

The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :

  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }

And then change the DStream.slice to call this new floor function by passing in its zeroTime.

    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0.

Author: Wesley Miao <wesley.miao@gmail.com>
Author: Wesley <wesley.miao@autodesk.com>

Closes apache#5871 from wesleymiao/spark-7326 and squashes the following commits:

82a4d8c [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream dosen't work all the time
48b4dc0 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
6ade399 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
2611745 [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…sn't work all the time

tdas

https://issues.apache.org/jira/browse/SPARK-7326

The problem most likely resides in DStream.slice() implementation, as shown below.

  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
    if (!isInitialized) {
      throw new SparkException(this + " has not been initialized")
    }
    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

    logInfo("Slicing from " + fromTime + " to " + toTime +
      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
      if (time >= zeroTime) getOrCompute(time) else None
    })
  }

Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.

The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :

  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }

And then change the DStream.slice to call this new floor function by passing in its zeroTime.

    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0.

Author: Wesley Miao <wesley.miao@gmail.com>
Author: Wesley <wesley.miao@autodesk.com>

Closes apache#5871 from wesleymiao/spark-7326 and squashes the following commits:

82a4d8c [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream dosen't work all the time
48b4dc0 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
6ade399 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
2611745 [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants