Skip to content

Commit

Permalink
[SPARK-24136][SS] Fix MemoryStreamDataReader.next to skip sleeping if…
Browse files Browse the repository at this point in the history
… record is available

## What changes were proposed in this pull request?

Avoid unnecessary sleep (10 ms) in each invocation of MemoryStreamDataReader.next.

## How was this patch tested?

Ran ContinuousSuite from IDE.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Arun Mahadevan <arunm@apache.org>

Closes apache#21207 from arunmahadevan/memorystream.
  • Loading branch information
arunmahadevan authored and jerryshao committed May 4, 2018
1 parent 0c23e25 commit 7f1b6b1
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,10 @@ class ContinuousMemoryStreamDataReader(
private var current: Option[Row] = None

override def next(): Boolean = {
current = None
current = getRecord
while (current.isEmpty) {
Thread.sleep(10)
current = endpoint.askSync[Option[Row]](
GetRecord(ContinuousMemoryStreamPartitionOffset(partition, currentOffset)))
current = getRecord
}
currentOffset += 1
true
Expand All @@ -199,6 +198,10 @@ class ContinuousMemoryStreamDataReader(

override def getOffset: ContinuousMemoryStreamPartitionOffset =
ContinuousMemoryStreamPartitionOffset(partition, currentOffset)

private def getRecord: Option[Row] =
endpoint.askSync[Option[Row]](
GetRecord(ContinuousMemoryStreamPartitionOffset(partition, currentOffset)))
}

case class ContinuousMemoryStreamOffset(partitionNums: Map[Int, Int])
Expand Down

0 comments on commit 7f1b6b1

Please sign in to comment.