Skip to content

Commit

Permalink
Merge pull request #321 from benesch/opt-stream-consumer
Browse files Browse the repository at this point in the history
Optimistically avoid locking in StreamConsumer
  • Loading branch information
benesch authored Jan 4, 2021
2 parents 38099da + 1c1945b commit 566c206
Showing 1 changed file with 14 additions and 6 deletions.
20 changes: 14 additions & 6 deletions src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,22 @@ where
type Item = KafkaResult<BorrowedMessage<'a>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Unconditionally store the waker so that we are woken up if the queue
// flips from non-empty to empty. We have to store the waker on every
// call to poll in case this future migrates between tasks. We also need
// to store the waker *before* calling poll to avoid a race where `poll`
// returns None to indicate that the queue is empty, but the queue
// becomes non-empty before we've installed the waker.
// If there is a message ready, yield it immediately to avoid the
// taking the lock in `self.set_waker`.
if let Some(message) = self.poll() {
return Poll::Ready(Some(message));
}

// Otherwise, we need to wait for a message to become available. Store
// the waker so that we are woken up if the queue flips from non-empty
// to empty. We have to store the waker repatedly in case this future
// migrates between tasks.
self.set_waker(cx.waker().clone());

// Check whether a new message became available after we installed the
// waker. This avoids a race where `poll` returns None to indicate that
// the queue is empty, but the queue becomes non-empty before we've
// installed the waker.
match self.poll() {
None => Poll::Pending,
Some(message) => Poll::Ready(Some(message)),
Expand Down

0 comments on commit 566c206

Please sign in to comment.