Skip to content

Commit

Permalink
UnboundedMemoryDuplex + comments
Browse files Browse the repository at this point in the history
  • Loading branch information
n-lebel committed Nov 25, 2024
1 parent 8d8a546 commit 75ef318
Showing 1 changed file with 69 additions and 1 deletion.
70 changes: 69 additions & 1 deletion serio/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl Stream for UnboundedMemoryStream {
}
}

/// Creates a new memory channel with the specified buffer size.
/// Creates a new unbounded memory channel.
pub fn unbounded() -> (UnboundedMemorySink, UnboundedMemoryStream) {
let (sender, receiver) = mpsc::unbounded();
(UnboundedMemorySink(sender), UnboundedMemoryStream(receiver))
Expand Down Expand Up @@ -207,6 +207,74 @@ pub fn duplex(buffer: usize) -> (MemoryDuplex, MemoryDuplex) {
)
}

/// An unbounded memory duplex that can be used to send and receive any serializable types.
#[derive(Debug)]
pub struct UnboundedMemoryDuplex {
sink: UnboundedMemorySink,
stream: UnboundedMemoryStream,
}

impl UnboundedMemoryDuplex {
/// Returns the inner sink and stream.
pub fn into_inner(self) -> (UnboundedMemorySink, UnboundedMemoryStream) {
(self.sink, self.stream)
}

/// Returns a reference to the inner sink.
pub fn sink_mut(&mut self) -> &mut UnboundedMemorySink {
&mut self.sink
}

/// Returns a reference to the inner stream.
pub fn stream_mut(&mut self) -> &mut UnboundedMemoryStream {
&mut self.stream
}
}

impl Sink for UnboundedMemoryDuplex {
type Error = Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_ready(cx)
}

fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
Pin::new(&mut self.sink).start_send(item)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_close(cx)
}
}

impl Stream for UnboundedMemoryDuplex {
type Error = Error;

fn poll_next<Item: Deserialize>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}

/// Creates a new unbounded memory duplex.
pub fn unbounded_duplex() -> (UnboundedMemoryDuplex, UnboundedMemoryDuplex) {
let (a, b) = unbounded();
let (c, d) = unbounded();
(
UnboundedMemoryDuplex { sink: a, stream: d },
UnboundedMemoryDuplex { sink: c, stream: b },
)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 75ef318

Please sign in to comment.