Skip to content

Commit

Permalink
Improve IO component structure (#13)
Browse files Browse the repository at this point in the history
* Improve IO component structure

* fmt

* fix doc

* fmt
  • Loading branch information
aecsocket authored Nov 8, 2024
1 parent 62c2979 commit fd3ec2e
Show file tree
Hide file tree
Showing 19 changed files with 113 additions and 91 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# 0.10.0

- Renamed `Endpoint` to `SessionEndpoint`
- Added `ServerEndpoint` to mark opening servers
- Made `Server` only be added to opened servers, and have this component store the
`opened_at: Instant`
- Removed `Opened`
- All relevant `aeronet_io` types are now registered in the type registry

# 0.9.0

- Redesigned to be Bevy native
Expand Down
4 changes: 2 additions & 2 deletions crates/aeronet_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use {
aeronet_io::{
AeronetIoPlugin, Endpoint, IoSet, Session,
AeronetIoPlugin, IoSet, Session, SessionEndpoint,
connection::{DROP_DISCONNECT_REASON, Disconnect, DisconnectReason, Disconnected},
packet::RecvPacket,
},
Expand Down Expand Up @@ -145,7 +145,7 @@ const MTU: usize = usize::MAX;
fn on_io_added(trigger: Trigger<OnAdd, ChannelIo>, mut commands: Commands) {
let entity = trigger.entity();
let session = Session::new(Instant::now(), MTU);
commands.entity(entity).insert((Endpoint, session));
commands.entity(entity).insert((SessionEndpoint, session));
}

fn on_disconnect(trigger: Trigger<Disconnect>, mut sessions: Query<&mut ChannelIo>) {
Expand Down
4 changes: 2 additions & 2 deletions crates/aeronet_io/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Logic for connection and disconnection of a [`Session`].
use {
crate::{Endpoint, Session},
crate::{Session, SessionEndpoint},
bevy_app::prelude::*,
bevy_derive::Deref,
bevy_ecs::prelude::*,
Expand Down Expand Up @@ -158,7 +158,7 @@ pub struct LocalAddr(pub SocketAddr);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deref, Component)]
pub struct PeerAddr(pub SocketAddr);

fn on_connecting(trigger: Trigger<OnAdd, Endpoint>) {
fn on_connecting(trigger: Trigger<OnAdd, SessionEndpoint>) {
let entity = trigger.entity();
debug!("{entity} connecting");
}
Expand Down
24 changes: 13 additions & 11 deletions crates/aeronet_io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ pub struct AeronetIoPlugin;

impl Plugin for AeronetIoPlugin {
fn build(&self, app: &mut App) {
app.register_type::<Session>()
app.register_type::<SessionEndpoint>()
.register_type::<Session>()
.configure_sets(PreUpdate, IoSet::Poll)
.configure_sets(PostUpdate, IoSet::Flush)
.add_plugins((
Expand All @@ -34,14 +35,15 @@ impl Plugin for AeronetIoPlugin {
}
}

/// Represents an [`Entity`] which is establishing a connection to a peer, so
/// that it may open a [`Session`] in the future.
/// Represents an [`Entity`] which may be establishing, or has already
/// established, a connection to a peer.
///
/// This is effectively a marker component for a [`Session`] which isn't
/// connected yet.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Component, Reflect)]
/// - If a session entity only has [`SessionEndpoint`], it is still connecting.
/// - If a session entity has [`SessionEndpoint`] and [`Session`], it has
/// successfully connected.
#[derive(Debug, Component, Reflect)]
#[reflect(Component)]
pub struct Endpoint;
pub struct SessionEndpoint;

/// Represents an [`Entity`] which can be used to transfer [packets] over a
/// connection to a peer session, potentially over a network.
Expand All @@ -68,9 +70,9 @@ pub struct Endpoint;
///
/// After creating a session entity using your chosen IO layer, the entity may
/// not start with the [`Session`] component - the session is *connecting* but
/// is not *connected* yet. This connecting state is marked with the
/// [`Endpoint`] component. Once the IO layer adds [`Session`], the entity is
/// considered *connected*, and you can send and receive data.
/// is not *connected* yet (marked by having [`SessionEndpoint`] but not
/// [`Session`]). Once the IO layer adds [`Session`], the entity is considered
/// *connected*, and you can send and receive data.
///
/// Note that [`Session`] is not a *guarantee* that you can send and receive
/// data - it is always possible that operations on OS sockets fail, the network
Expand Down Expand Up @@ -106,7 +108,7 @@ pub struct Endpoint;
/// [`Disconnect`]: connection::Disconnect
#[derive(Debug, Component, Reflect)]
#[reflect(from_reflect = false, Component)]
// TODO: required component Endpoint
// TODO: required component `SessionEndpoint`
pub struct Session {
connected_at: Instant,
min_mtu: usize,
Expand Down
71 changes: 37 additions & 34 deletions crates/aeronet_io/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,43 @@ pub(crate) struct ServerPlugin;

impl Plugin for ServerPlugin {
fn build(&self, app: &mut App) {
app.register_type::<Server>()
.register_type::<Opened>()
app.register_type::<ServerEndpoint>()
.register_type::<Server>()
.observe(on_opening)
.observe(on_opened)
.observe(on_close)
.observe(on_closed);
}
}

/// Marker component for an [`Entity`] which listens for client connections, and
/// spawns [`Session`]s to communicate with those clients.
/// Represents an [`Entity`] which may be preparing to, or has already started,
/// listening for client connections.
///
/// - If a server entity only has [`ServerEndpoint`], it is still opening.
/// - If a server entity has [`ServerEndpoint`] and [`Server`], it has
/// successfully opened.
#[derive(Debug, Component, Reflect)]
pub struct ServerEndpoint;

/// Represents an [`Entity`] which listens for client connections, and spawns
/// [`Session`]s to communicate with those clients.
///
/// This represents the "server" part of the client/server networking model (a
/// client is represented as just a [`Session`]). Its responsibility is to
/// accept and coordinate connections between multiple clients. Note, however,
/// that this does not have to represent a *dedicated* server - you may run
/// a server, and connect a client to that server, in the same app.
///
/// The server starts in an opening state (when [`Server`] has been added but
/// [`Opened`] is not yet present), and transitions to either an [`Opened`]
/// state, or fails to open and is [`Closed`]. After the server is opened, the
/// server should not close unless there is a fatal server-internal error which
/// affects all connected clients - if a single client causes issues e.g.
/// sending illegal data or breaking some invariant, that single client will be
/// disconnected instead of the entire server being torn down.
/// The server starts in an opening state (when [`ServerEndpoint`] has been
/// added but [`Server`] is not yet present), and transitions to either an
/// opened state, or fails to open and is [`Closed`]. After the server is
/// opened, the server should not close unless there is a fatal server-internal
/// error which affects all connected clients - if a single client causes issues
/// e.g. sending illegal data or breaking some invariant, that single client
/// will be disconnected instead of the entire server being torn down.
///
/// To listen for when a server is opened, add an observer listening for
/// [`Trigger<OnAdd, Server>`].
///
/// When a client connects, it is spawned as a [child] of the server entity.
/// Therefore, to query for sessions spawned under a server, use
Expand All @@ -56,30 +68,21 @@ impl Plugin for ServerPlugin {
///
/// [child]: Children
/// [`Session`]: crate::Session
#[derive(Debug, Clone, Copy, Default, Component, Reflect)]
#[reflect(Component)]
pub struct Server;

/// Component for a [`Server`] which is currently attempting to receive client
/// connections and spawn [`Session`]s.
///
/// To listen for when a server is opened, add an observer listening for
/// [`Trigger<OnAdd, Opened>`].
///
/// [`Session`]: crate::Session
#[derive(Debug, Clone, Copy, Component, Reflect)]
#[reflect(Component)]
pub struct Opened {
/// Instant at which the server was opened.
pub at: Instant,
#[derive(Debug, Component, Reflect)]
#[reflect(from_reflect = false, Component)]
// TODO: required component `ServerEndpoint`
pub struct Server {
opened_at: Instant,
}

impl Opened {
/// Creates an [`Opened`] which indicates that the server was opened
/// [`now`](Instant::now).
impl Server {
/// Creates a new [`Server`].
///
/// - `opened_at`: the instant at which the IO layer acknowledged that the
/// server is now ready to accept client connections.
#[must_use]
pub fn now() -> Self {
Self { at: Instant::now() }
pub const fn new(opened_at: Instant) -> Self {
Self { opened_at }
}
}

Expand Down Expand Up @@ -178,12 +181,12 @@ impl<E> From<E> for CloseReason<E> {
}
}

fn on_opening(trigger: Trigger<OnAdd, Server>) {
fn on_opening(trigger: Trigger<OnAdd, ServerEndpoint>) {
let server = trigger.entity();
debug!("{server} opening");
}

fn on_opened(trigger: Trigger<OnAdd, Opened>) {
fn on_opened(trigger: Trigger<OnAdd, Server>) {
let server = trigger.entity();
debug!("{server} opened");
}
Expand Down
10 changes: 5 additions & 5 deletions crates/aeronet_replicon/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use {
crate::convert,
aeronet_io::{Endpoint, Session, connection::Disconnect, web_time::Instant},
aeronet_io::{Session, SessionEndpoint, connection::Disconnect, web_time::Instant},
aeronet_transport::{AeronetTransportPlugin, Transport, TransportSet},
bevy_app::prelude::*,
bevy_ecs::prelude::*,
Expand Down Expand Up @@ -95,9 +95,9 @@ pub enum ClientTransportSet {
/// - sending messages
/// - all outgoing `replicon` messages are cloned and sent to all sessions
/// - determining connected status
/// - if at least 1 session has [`Session`], [`RepliconClient`] is
/// [`RepliconClientStatus::Connected`]
/// - if at least 1 session has [`Endpoint`], [`RepliconClient`] is
/// - if at least 1 session has both [`SessionEndpoint`] and [`Session`],
/// [`RepliconClient`] is [`RepliconClientStatus::Connected`]
/// - if at least 1 session has [`SessionEndpoint`], [`RepliconClient`] is
/// [`RepliconClientStatus::Connecting`]
/// - else, [`RepliconClientStatus::Disconnected`]
///
Expand Down Expand Up @@ -145,7 +145,7 @@ fn on_client_connected(

fn update_state(
mut replicon_client: ResMut<RepliconClient>,
clients: Query<Option<&Session>, (With<Endpoint>, With<AeronetRepliconClient>)>,
clients: Query<Option<&Session>, (With<SessionEndpoint>, With<AeronetRepliconClient>)>,
) {
let status =
clients.iter().fold(
Expand Down
11 changes: 8 additions & 3 deletions crates/aeronet_replicon/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
aeronet_io::{
Session,
connection::{DisconnectReason, Disconnected},
server::{Opened, Server},
server::{Server, ServerEndpoint},
web_time::Instant,
},
aeronet_transport::{AeronetTransportPlugin, Transport, TransportSet},
Expand Down Expand Up @@ -100,7 +100,8 @@ pub enum ServerTransportSet {
/// - the child [`Entity`] (which has [`Session`]) is used as the identifier
/// for the client which sent/receives the message (see [`convert`])
/// - determining server [running] status
/// - if at least 1 server is [`Opened`], [`RepliconServer`] is [running]
/// - if at least 1 entity has both [`ServerEndpoint`] and [`Server`],
/// [`RepliconServer`] is [running]
///
/// Although you can only have one [`RepliconServer`] at a time, it actually
/// makes sense to have multiple [`AeronetRepliconServer`] entities (unlike
Expand All @@ -116,7 +117,11 @@ pub enum ServerTransportSet {
#[reflect(Component)]
pub struct AeronetRepliconServer;

type OpenedServer = (With<Server>, With<Opened>, With<AeronetRepliconServer>);
type OpenedServer = (
With<ServerEndpoint>,
With<Server>,
With<AeronetRepliconServer>,
);

fn update_state(
mut replicon_server: ResMut<RepliconServer>,
Expand Down
4 changes: 2 additions & 2 deletions crates/aeronet_websocket/examples/websocket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use {
aeronet_io::{
Endpoint, Session,
Session, SessionEndpoint,
connection::{Disconnect, DisconnectReason, Disconnected, LocalAddr, PeerAddr},
},
aeronet_websocket::client::{ClientConfig, WebSocketClient, WebSocketClientPlugin},
Expand Down Expand Up @@ -37,7 +37,7 @@ struct SessionUi {
}

fn on_connecting(
trigger: Trigger<OnAdd, Endpoint>,
trigger: Trigger<OnAdd, SessionEndpoint>,
names: Query<&Name>,
mut ui_state: ResMut<GlobalUi>,
) {
Expand Down
9 changes: 5 additions & 4 deletions crates/aeronet_websocket/examples/websocket_echo_server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Example server using WebSocket which listens for clients sending strings
//! and sends back a string reply.
use aeronet_io::server::Server;

cfg_if::cfg_if! {
if #[cfg(target_family = "wasm")] {
fn main() {
Expand All @@ -11,8 +13,7 @@ cfg_if::cfg_if! {
use {
aeronet_io::{
connection::{DisconnectReason, Disconnected, LocalAddr},
server::Opened,
Endpoint, Session,
SessionEndpoint, Session,
},
aeronet_websocket::server::{Identity, ServerConfig, WebSocketServer, WebSocketServerPlugin},
bevy::{log::LogPlugin, prelude::*},
Expand Down Expand Up @@ -42,13 +43,13 @@ fn open_server(mut commands: Commands) {
commands.spawn_empty().add(WebSocketServer::open(config));
}

fn on_opened(trigger: Trigger<OnAdd, Opened>, servers: Query<&LocalAddr>) {
fn on_opened(trigger: Trigger<OnAdd, Server>, servers: Query<&LocalAddr>) {
let server = trigger.entity();
let local_addr = servers.get(server).expect("opened server should have a binding socket `LocalAddr`");
info!("{server} opened on {}", **local_addr);
}

fn on_connecting(trigger: Trigger<OnAdd, Endpoint>, clients: Query<&Parent>) {
fn on_connecting(trigger: Trigger<OnAdd, SessionEndpoint>, clients: Query<&Parent>) {
let client = trigger.entity();
let Ok(server) = clients.get(client).map(Parent::get) else {
return;
Expand Down
4 changes: 2 additions & 2 deletions crates/aeronet_websocket/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
session::{self, MTU, SessionError, SessionFrontend, WebSocketIo, WebSocketSessionPlugin},
},
aeronet_io::{
Endpoint, IoSet, Session,
IoSet, Session, SessionEndpoint,
connection::{DisconnectReason, Disconnected},
},
bevy_app::prelude::*,
Expand Down Expand Up @@ -132,7 +132,7 @@ fn connect(session: Entity, world: &mut World, config: ClientConfig, target: Con
);

world.entity_mut(session).insert((
Endpoint, // TODO: required component of WebSocketClient
SessionEndpoint, // TODO: required component of WebSocketClient
WebSocketClient(ClientFrontend::Connecting { recv_dc, recv_next }),
));
}
Expand Down
12 changes: 7 additions & 5 deletions crates/aeronet_websocket/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use {
tungstenite,
},
aeronet_io::{
Endpoint, IoSet,
IoSet, SessionEndpoint,
connection::{DisconnectReason, Disconnected, LocalAddr, PeerAddr},
server::{CloseReason, Closed, Opened, Server},
server::{CloseReason, Closed, Server, ServerEndpoint},
},
bevy_app::prelude::*,
bevy_ecs::{prelude::*, system::EntityCommand},
Expand All @@ -23,6 +23,7 @@ use {
futures::channel::{mpsc, oneshot},
std::io,
tracing::{Instrument, debug_span},
web_time::Instant,
};

/// Allows using [`WebSocketServer`].
Expand Down Expand Up @@ -178,7 +179,7 @@ struct ToConnected {
// TODO: required components
fn on_server_added(trigger: Trigger<OnAdd, WebSocketServer>, mut commands: Commands) {
let server = trigger.entity();
commands.entity(server).insert(Server);
commands.entity(server).insert(ServerEndpoint);
}

fn poll_servers(mut commands: Commands, mut servers: Query<(Entity, &mut WebSocketServer)>) {
Expand Down Expand Up @@ -214,9 +215,10 @@ fn poll_opening(
};
};

let now = Instant::now();
commands
.entity(server)
.insert((Opened::now(), LocalAddr(next.local_addr)));
.insert((Server::new(now), LocalAddr(next.local_addr)));
Frontend::Open {
recv_closed,
recv_connecting: next.recv_connecting,
Expand All @@ -240,7 +242,7 @@ fn poll_open(
.spawn_empty()
.set_parent(server)
.insert((
Endpoint, // TODO: required component of ClientFrontend
SessionEndpoint, // TODO: required component of ClientFrontend
ClientFrontend::Connecting {
recv_dc: connecting.recv_dc,
recv_next: connecting.recv_next,
Expand Down
Loading

0 comments on commit fd3ec2e

Please sign in to comment.