Skip to content

Commit

Permalink
Merge pull request #48 from smol-rs/notgull/evl5
Browse files Browse the repository at this point in the history
Port to event-listener v5.0
  • Loading branch information
zeenix authored Feb 8, 2024
2 parents c2ec71e + 3c45a87 commit ab8a1eb
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 33 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ name = "broadcast_bench"
[features]

[dependencies]
event-listener = "3"
event-listener-strategy = "0.1.0"
event-listener = "5.0.0"
event-listener-strategy = "0.5.0"
futures-core = "0.3.21"
pin-project-lite = "0.2.13"

[dev-dependencies]
criterion = "0.3.5"
Expand Down
73 changes: 42 additions & 31 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ use std::convert::TryInto;
use std::error;
use std::fmt;
use std::future::Future;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};

use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture};
use futures_core::{ready, stream::Stream};
use pin_project_lite::pin_project;

/// Create a new broadcast channel.
///
Expand Down Expand Up @@ -695,6 +697,7 @@ impl<T: Clone> Sender<T> {
sender: self,
listener: None,
msg: Some(msg),
_pin: PhantomPinned
})
}

Expand Down Expand Up @@ -785,7 +788,7 @@ pub struct Receiver<T> {
pos: u64,

/// Listens for a send or close event to unblock this stream.
listener: Option<Pin<Box<EventListener>>>,
listener: Option<EventListener>,
}

impl<T> Receiver<T> {
Expand Down Expand Up @@ -1185,6 +1188,7 @@ impl<T: Clone> Receiver<T> {
Recv::_new(RecvInner {
receiver: self,
listener: None,
_pin: PhantomPinned
})
}

Expand Down Expand Up @@ -1596,25 +1600,28 @@ easy_wrapper! {
pub(crate) wait();
}

#[derive(Debug)]
struct SendInner<'a, T> {
sender: &'a Sender<T>,
// TODO: Remove the Pin<Box<>> at the next breaking release and make this type !Unpin
listener: Option<Pin<Box<EventListener>>>,
msg: Option<T>,
}
pin_project! {
#[derive(Debug)]
struct SendInner<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
msg: Option<T>,

impl<'a, T> Unpin for SendInner<'a, T> {}
// Keeping this type `!Unpin` enables future optimizations.
#[pin]
_pin: PhantomPinned
}
}

impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> {
type Output = Result<Option<T>, SendError<T>>;

fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
self: Pin<&'x mut Self>,
self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
let mut this = Pin::new(self);
let this = self.project();

loop {
let msg = this.msg.take().unwrap();
Expand All @@ -1633,24 +1640,24 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> {
return Poll::Ready(Ok(msg));
}
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
Err(TrySendError::Full(m)) => this.msg = Some(m),
Err(TrySendError::Full(m)) => *this.msg = Some(m),
Err(TrySendError::Inactive(m)) if inner.read().unwrap().await_active => {
this.msg = Some(m)
*this.msg = Some(m)
}
Err(TrySendError::Inactive(m)) => return Poll::Ready(Err(SendError(m))),
}

// Sending failed - now start listening for notifications or wait for one.
match &mut this.listener {
match &this.listener {
None => {
// Start listening and then try sending again.
let inner = inner.write().unwrap();
this.listener = Some(inner.send_ops.listen());
*this.listener = Some(inner.send_ops.listen());
}
Some(l) => {
Some(_) => {
// Wait for a notification.
ready!(strategy.poll(l.as_mut(), context));
this.listener = None;
ready!(strategy.poll(this.listener, context));
*this.listener = None;
}
}
}
Expand All @@ -1665,23 +1672,27 @@ easy_wrapper! {
pub(crate) wait();
}

#[derive(Debug)]
struct RecvInner<'a, T> {
receiver: &'a mut Receiver<T>,
listener: Option<Pin<Box<EventListener>>>,
}
pin_project! {
#[derive(Debug)]
struct RecvInner<'a, T> {
receiver: &'a mut Receiver<T>,
listener: Option<EventListener>,

impl<'a, T> Unpin for RecvInner<'a, T> {}
// Keeping this type `!Unpin` enables future optimizations.
#[pin]
_pin: PhantomPinned
}
}

impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> {
type Output = Result<T, RecvError>;

fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
self: Pin<&'x mut Self>,
self: Pin<&mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
let mut this = Pin::new(self);
let this = self.project();

loop {
// Attempt to receive a message.
Expand All @@ -1695,18 +1706,18 @@ impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> {
}

// Receiving failed - now start listening for notifications or wait for one.
match &mut this.listener {
match &this.listener {
None => {
// Start listening and then try receiving again.
this.listener = {
*this.listener = {
let inner = this.receiver.inner.write().unwrap();
Some(inner.recv_ops.listen())
};
}
Some(l) => {
Some(_) => {
// Wait for a notification.
ready!(strategy.poll(l.as_mut(), context));
this.listener = None;
ready!(strategy.poll(this.listener, context));
*this.listener = None;
}
}
}
Expand Down

0 comments on commit ab8a1eb

Please sign in to comment.