Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: socket/channel closed detection #337

Merged
merged 14 commits into from
Jan 6, 2024
107 changes: 100 additions & 7 deletions matchbox_socket/src/webrtc_socket/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl WebRtcSocketBuilder {
/// Sets the number of attempts to make at reconnecting to the signaling server,
/// if `None` the socket will attempt to connect indefinitely.
///
/// The default is 2 reconnection attempts.
/// The default is 3 reconnection attempts.
pub fn reconnect_attempts(mut self, attempts: Option<u16>) -> Self {
self.config.attempts = attempts;
self
Expand Down Expand Up @@ -349,6 +349,19 @@ pub struct WebRtcChannel {
}

impl WebRtcChannel {
/// Returns whether it's still possible to send messages.
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}

/// Close this channel.
///
/// This prevents sending and receiving any messages in the future, but does not drain messages that are buffered.
pub fn close(&mut self) {
self.tx.close_channel();
self.rx.close();
}

/// Call this where you want to handle new received messages. Returns immediately.
///
/// Messages are removed from the socket when called.
Expand Down Expand Up @@ -430,6 +443,19 @@ impl WebRtcSocket {
}

impl<C: ChannelPlurality> WebRtcSocket<C> {
/// Disconnect from the peer, severing all communication channels.
pub fn disconnect(&mut self, peer: PeerId) {
todo!("Needs attention from @johanhelsing or @garryod")
}

/// Close this socket, disconnecting all channels.
pub fn close(mut self) {
self.channels
.iter_mut()
.filter_map(Option::take)
.for_each(|mut c| c.close());
}

/// Handle peers connecting or disconnecting
///
/// Constructed using [`WebRtcSocketBuilder`].
Expand Down Expand Up @@ -515,6 +541,27 @@ impl<C: ChannelPlurality> WebRtcSocket<C> {
}
}

/// Gets an immutable reference to the [`WebRtcChannel`] of a given id.
///
/// ```
/// use matchbox_socket::*;
///
/// let (mut socket, message_loop) = WebRtcSocketBuilder::new("wss://example.invalid/")
/// .add_channel(ChannelConfig::reliable())
/// .add_channel(ChannelConfig::unreliable())
/// .build();
/// let is_closed = socket.channel(0).is_closed();
/// ```
///
/// See also: [`WebRtcSocket::channel_mut`], [`WebRtcSocket::get_channel`], [`WebRtcSocket::take_channel`]
///
/// # Panics
///
/// will panic if the channel cannot be found.
pub fn channel(&self, channel: usize) -> &WebRtcChannel {
self.get_channel(channel).unwrap()
}

/// Gets a mutable reference to the [`WebRtcChannel`] of a given id.
///
/// ```
Expand All @@ -527,13 +574,36 @@ impl<C: ChannelPlurality> WebRtcSocket<C> {
/// let reliable_channel_messages = socket.channel(0).receive();
/// ```
///
/// See also: [`WebRtcSocket::get_channel`], [`WebRtcSocket::take_channel`]
/// See also: [`WebRtcSocket::channel`], [`WebRtcSocket::get_channel`], [`WebRtcSocket::take_channel`]
///
/// # Panics
///
/// will panic if the channel cannot be found.
pub fn channel(&mut self, channel: usize) -> &mut WebRtcChannel {
self.get_channel(channel).unwrap()
pub fn channel_mut(&mut self, channel: usize) -> &mut WebRtcChannel {
self.get_channel_mut(channel).unwrap()
}

/// Gets an immutable reference to the [`WebRtcChannel`] of a given id.
///
/// Returns an error if the channel was not found.
///
/// ```
/// use matchbox_socket::*;
///
/// let (mut socket, message_loop) = WebRtcSocketBuilder::new("wss://example.invalid/")
/// .add_channel(ChannelConfig::reliable())
/// .add_channel(ChannelConfig::unreliable())
/// .build();
/// let is_closed = socket.get_channel(0).unwrap().is_closed();
/// ```
///
/// See also: [`WebRtcSocket::get_channel_mut`], [`WebRtcSocket::take_channel`]
pub fn get_channel(&self, channel: usize) -> Result<&WebRtcChannel, ChannelError> {
self.channels
.get(channel)
.ok_or(ChannelError::NotFound)?
.as_ref()
.ok_or(ChannelError::Taken)
}

/// Gets a mutable reference to the [`WebRtcChannel`] of a given id.
Expand All @@ -551,7 +621,7 @@ impl<C: ChannelPlurality> WebRtcSocket<C> {
/// ```
///
/// See also: [`WebRtcSocket::channel`], [`WebRtcSocket::take_channel`]
pub fn get_channel(&mut self, channel: usize) -> Result<&mut WebRtcChannel, ChannelError> {
pub fn get_channel_mut(&mut self, channel: usize) -> Result<&mut WebRtcChannel, ChannelError> {
self.channels
.get_mut(channel)
.ok_or(ChannelError::NotFound)?
Expand Down Expand Up @@ -587,13 +657,13 @@ impl WebRtcSocket<SingleChannel> {
///
/// Messages are removed from the socket when called.
pub fn receive(&mut self) -> Vec<(PeerId, Packet)> {
self.channel(0).receive()
self.channel_mut(0).receive()
}

/// Try to send a packet to the given peer. An error is propagated if the socket future
/// is dropped. `Ok` is not a guarantee of delivery.
pub fn try_send(&mut self, packet: Packet, peer: PeerId) -> Result<(), SendError> {
self.channel(0).try_send(packet, peer)
self.channel_mut(0).try_send(packet, peer)
}

/// Send a packet to the given peer. There is no guarantee of delivery.
Expand All @@ -603,6 +673,29 @@ impl WebRtcSocket<SingleChannel> {
pub fn send(&mut self, packet: Packet, peer: PeerId) {
self.try_send(packet, peer).expect("Send failed");
}

/// Returns whether the socket channel is closed
pub fn is_closed(&self) -> bool {
self.channel(0).is_closed()
}
}

impl WebRtcSocket<MultipleChannels> {
/// Returns whether any socket channel is closed
pub fn any_closed(&self) -> bool {
self.channels
.iter()
.filter_map(Option::as_ref)
.any(|c| c.is_closed())
}

/// Returns whether all socket channels are closed
pub fn all_closed(&self) -> bool {
self.channels
.iter()
.filter_map(Option::as_ref)
.all(|c| c.is_closed())
}
}

pub(crate) fn new_senders_and_receivers<T>(
Expand Down