From 5f8c61db2de88eff12545b24c2da300afa75d4c3 Mon Sep 17 00:00:00 2001 From: Florian Pabst Date: Sat, 17 Jun 2023 01:49:15 +0200 Subject: [PATCH 01/28] First setup --- Cargo.toml | 10 ++- renet_steam_transport/Cargo.toml | 16 +++++ renet_steam_transport/src/lib.rs | 7 +++ renet_steam_transport/src/transport/client.rs | 0 renet_steam_transport/src/transport/server.rs | 62 +++++++++++++++++++ 5 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 renet_steam_transport/Cargo.toml create mode 100644 renet_steam_transport/src/lib.rs create mode 100644 renet_steam_transport/src/transport/client.rs create mode 100644 renet_steam_transport/src/transport/server.rs diff --git a/Cargo.toml b/Cargo.toml index 12fd46aa..112fabdb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,11 @@ [workspace] -members = ["renet", "demo_chat", "demo_bevy", "renetcode", "bevy_renet", "renet_visualizer"] +members = [ + "renet", + "demo_chat", + "demo_bevy", + "renetcode", + "bevy_renet", + "renet_visualizer", + "renet_steam_transport", +] resolver = "2" diff --git a/renet_steam_transport/Cargo.toml b/renet_steam_transport/Cargo.toml new file mode 100644 index 00000000..56efd7f0 --- /dev/null +++ b/renet_steam_transport/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "renet_steam_transport" +version = "0.0.1" +keywords = ["gamedev", "networking", "transport"] +description = "steam transport for the renet crate: Server/Client network library for multiplayer games" +repository = "https://github.com/lucaspoffo/renet" +license = "MIT OR Apache-2.0" +readme = "README.md" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +renet = { path = "../renet" } +steamworks = "0.10" +renetcode = { path = "../renetcode", version = "0.0.8" } diff --git a/renet_steam_transport/src/lib.rs b/renet_steam_transport/src/lib.rs new file mode 100644 index 00000000..dbceb4e7 --- /dev/null +++ b/renet_steam_transport/src/lib.rs @@ -0,0 +1,7 @@ +const MTU_UNRELIABLE: usize = 1200; // 1200 bytes for unreliable packets before they got split by Steam +const MTU_RELIABLE: usize = 512 * 1024; // 512 KB for reliable packets, they may accept more by recieve but this is the max size of a single packet the they can send + +pub mod transport { + pub mod client; + pub mod server; +} diff --git a/renet_steam_transport/src/transport/client.rs b/renet_steam_transport/src/transport/client.rs new file mode 100644 index 00000000..e69de29b diff --git a/renet_steam_transport/src/transport/server.rs b/renet_steam_transport/src/transport/server.rs new file mode 100644 index 00000000..f5ac6b8b --- /dev/null +++ b/renet_steam_transport/src/transport/server.rs @@ -0,0 +1,62 @@ +use steamworks::{ + networking_sockets::{ListenSocket, NetworkingSockets}, + networking_types::{NetworkingConfigEntry, SendFlags}, + networking_utils::NetworkingUtils, + Client, ClientManager, ServerManager, +}; + +pub struct Server { + listen_socket: ListenSocket, + //renet_server: NetcodeServer, +} + +trait Transport { + fn update(&self); + fn send_packets(&self); +} + +impl Transport for Server { + fn update(&self) { + match self.listen_socket.try_receive_event() { + Some(event) => match event { + steamworks::networking_types::ListenSocketEvent::Connected(event) => { + // TODO register client + event.connection().send_message(b"Hello, world!", SendFlags::UNRELIABLE); + } + steamworks::networking_types::ListenSocketEvent::Disconnected(event) => { + // TODO unregister client + } + steamworks::networking_types::ListenSocketEvent::Connecting(event) => { + // TODO client acceptance check must be done here immediately + } + }, + None => {} + } + } + fn send_packets(&self) {} +} + +impl Server { + pub fn new(client: &Client) -> Self { + // TODO this must be called at the beginning of the application + client.networking_utils().init_relay_network_access(); + let options: Vec = Vec::new(); + let socket; + match client.networking_sockets().create_listen_socket_p2p(0, options) { + Ok(listen_socket) => { + socket = listen_socket; + } + Err(handle) => { + panic!("Failed to create listen socket: {:?}", handle); + } + } + // TODO NetcodeServer should be able to deactivate encryption/decryption, because Steam handles that already + Self { + listen_socket: socket, + //renet_server: NetcodeServer::new() + } + } + + pub fn update(&mut self) {} + pub fn send_packets(&mut self) {} +} From a813f5dd04d5b44863c972eea05575d437293245 Mon Sep 17 00:00:00 2001 From: Florian Pabst Date: Sat, 17 Jun 2023 18:10:18 +0200 Subject: [PATCH 02/28] Added implementation of incoming events --- renet_steam_transport/Cargo.toml | 1 - renet_steam_transport/src/transport/server.rs | 82 ++++++++++++++----- 2 files changed, 61 insertions(+), 22 deletions(-) diff --git a/renet_steam_transport/Cargo.toml b/renet_steam_transport/Cargo.toml index 56efd7f0..06895d92 100644 --- a/renet_steam_transport/Cargo.toml +++ b/renet_steam_transport/Cargo.toml @@ -13,4 +13,3 @@ edition = "2021" [dependencies] renet = { path = "../renet" } steamworks = "0.10" -renetcode = { path = "../renetcode", version = "0.0.8" } diff --git a/renet_steam_transport/src/transport/server.rs b/renet_steam_transport/src/transport/server.rs index f5ac6b8b..373bc06d 100644 --- a/renet_steam_transport/src/transport/server.rs +++ b/renet_steam_transport/src/transport/server.rs @@ -1,43 +1,72 @@ +// TODO add client itself as connection maybe over a const u64 identifier or alternativly its own steamid? + +use std::{collections::HashMap, time::Duration}; + +use renet::RenetServer; use steamworks::{ - networking_sockets::{ListenSocket, NetworkingSockets}, - networking_types::{NetworkingConfigEntry, SendFlags}, - networking_utils::NetworkingUtils, - Client, ClientManager, ServerManager, + networking_sockets::{ListenSocket, NetConnection}, + networking_types::NetworkingConfigEntry, + Client, ClientManager, SteamId, }; +pub struct SteamTransportConfig { + max_clients: usize, +} + pub struct Server { listen_socket: ListenSocket, - //renet_server: NetcodeServer, + config: SteamTransportConfig, + connections: HashMap>, + connections_steam_id: HashMap, } trait Transport { - fn update(&self); - fn send_packets(&self); + fn update(&mut self, duration: Duration, server: &mut RenetServer); + fn send_packets(&self, server: &mut RenetServer); } impl Transport for Server { - fn update(&self) { + fn update(&mut self, duration: Duration, server: &mut RenetServer) { match self.listen_socket.try_receive_event() { Some(event) => match event { steamworks::networking_types::ListenSocketEvent::Connected(event) => { - // TODO register client - event.connection().send_message(b"Hello, world!", SendFlags::UNRELIABLE); - } - steamworks::networking_types::ListenSocketEvent::Disconnected(event) => { - // TODO unregister client + let client_id = server.get_free_id(); + match event.remote().steam_id() { + Some(steam_id) => { + self.connections_steam_id.insert(steam_id, client_id); + } + _ => {} + } + server.add_connection(client_id); + self.connections.insert(client_id, event.take_connection()); } + steamworks::networking_types::ListenSocketEvent::Disconnected(event) => match event.remote().steam_id() { + Some(steam_id) => { + // TODO check if there is a better way + let client_id = self.connections_steam_id.get(&steam_id.clone()).unwrap().clone(); + server.remove_connection(client_id); + self.connections_steam_id.remove(&steam_id); + self.connections.remove(&client_id); + } + None => {} + }, steamworks::networking_types::ListenSocketEvent::Connecting(event) => { - // TODO client acceptance check must be done here immediately + if server.connected_clients() < self.config.max_clients { + let _ = event.accept(); + } else { + event.reject(steamworks::networking_types::NetConnectionEnd::AppGeneric, Some("Too many clients")); + } } }, - None => {} + _ => {} } } - fn send_packets(&self) {} + + fn send_packets(&self, server: &mut RenetServer) {} } impl Server { - pub fn new(client: &Client) -> Self { + pub fn new(client: &Client, config: SteamTransportConfig) -> Self { // TODO this must be called at the beginning of the application client.networking_utils().init_relay_network_access(); let options: Vec = Vec::new(); @@ -47,16 +76,27 @@ impl Server { socket = listen_socket; } Err(handle) => { + // TODO Logging instead of panic panic!("Failed to create listen socket: {:?}", handle); } } - // TODO NetcodeServer should be able to deactivate encryption/decryption, because Steam handles that already Self { listen_socket: socket, - //renet_server: NetcodeServer::new() + config, + connections: HashMap::new(), + connections_steam_id: HashMap::new(), } } +} - pub fn update(&mut self) {} - pub fn send_packets(&mut self) {} +// Extensions for the RenetServer +trait AutoGeneratedId { + fn get_free_id(&self) -> u64; +} + +impl AutoGeneratedId for RenetServer { + fn get_free_id(&self) -> u64 { + let id = self.clients_id().len() as u64 + 1; + id + } } From 1a54af04d7fac213a84cb40f09d2fbbb3cda89fc Mon Sep 17 00:00:00 2001 From: Florian Pabst Date: Sat, 17 Jun 2023 18:10:35 +0200 Subject: [PATCH 03/28] Fix typo --- renet/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/renet/src/server.rs b/renet/src/server.rs index c99387d6..3d815a76 100644 --- a/renet/src/server.rs +++ b/renet/src/server.rs @@ -205,7 +205,7 @@ impl RenetServer { .collect() } - /// Returns the current number of connected clients1. + /// Returns the current number of connected clients. pub fn connected_clients(&self) -> usize { self.connections.iter().filter(|(_, c)| c.is_disconnected()).count() } From 8bbce9890d6ddd3f5e178a235d00da8355998422 Mon Sep 17 00:00:00 2001 From: Florian Pabst Date: Sun, 18 Jun 2023 13:15:50 +0200 Subject: [PATCH 04/28] Added host logic to the server - added send packets --- renet_steam_transport/Cargo.toml | 1 + renet_steam_transport/src/lib.rs | 4 +- renet_steam_transport/src/transport/server.rs | 156 ++++++++++++++++-- 3 files changed, 142 insertions(+), 19 deletions(-) diff --git a/renet_steam_transport/Cargo.toml b/renet_steam_transport/Cargo.toml index 06895d92..70263267 100644 --- a/renet_steam_transport/Cargo.toml +++ b/renet_steam_transport/Cargo.toml @@ -13,3 +13,4 @@ edition = "2021" [dependencies] renet = { path = "../renet" } steamworks = "0.10" +log = "0.4.19" diff --git a/renet_steam_transport/src/lib.rs b/renet_steam_transport/src/lib.rs index dbceb4e7..bb0629da 100644 --- a/renet_steam_transport/src/lib.rs +++ b/renet_steam_transport/src/lib.rs @@ -1,7 +1,5 @@ -const MTU_UNRELIABLE: usize = 1200; // 1200 bytes for unreliable packets before they got split by Steam -const MTU_RELIABLE: usize = 512 * 1024; // 512 KB for reliable packets, they may accept more by recieve but this is the max size of a single packet the they can send - pub mod transport { + const HOST_CLIENT: u64 = u64::MAX; pub mod client; pub mod server; } diff --git a/renet_steam_transport/src/transport/server.rs b/renet_steam_transport/src/transport/server.rs index 373bc06d..a46ff9dd 100644 --- a/renet_steam_transport/src/transport/server.rs +++ b/renet_steam_transport/src/transport/server.rs @@ -1,35 +1,45 @@ -// TODO add client itself as connection maybe over a const u64 identifier or alternativly its own steamid? +const MAX_MESSAGE_BATCH_SIZE: usize = 255; use std::{collections::HashMap, time::Duration}; use renet::RenetServer; use steamworks::{ networking_sockets::{ListenSocket, NetConnection}, - networking_types::NetworkingConfigEntry, - Client, ClientManager, SteamId, + networking_types::{ListenSocketEvent, NetConnectionEnd, NetworkingConfigEntry, SendFlags}, + Client, ClientManager, ServerManager, SteamId, }; +use super::HOST_CLIENT; + pub struct SteamTransportConfig { max_clients: usize, } pub struct Server { + /// hold the active socket of the server listen_socket: ListenSocket, + /// hold the configuration of the server config: SteamTransportConfig, + /// hold the active connections of the server (key is the client id) (value is the connection) connections: HashMap>, + /// is needed to handle the disconnect of a client (key is the steam id) (value is the client id) connections_steam_id: HashMap, + /// hold the packets for the host client + host_queue: Vec>, } trait Transport { fn update(&mut self, duration: Duration, server: &mut RenetServer); - fn send_packets(&self, server: &mut RenetServer); + fn send_packets(&mut self, server: &mut RenetServer); } +// ClientManager implementation + impl Transport for Server { - fn update(&mut self, duration: Duration, server: &mut RenetServer) { + fn update(&mut self, _duration: Duration, server: &mut RenetServer) { match self.listen_socket.try_receive_event() { Some(event) => match event { - steamworks::networking_types::ListenSocketEvent::Connected(event) => { + ListenSocketEvent::Connected(event) => { let client_id = server.get_free_id(); match event.remote().steam_id() { Some(steam_id) => { @@ -40,32 +50,86 @@ impl Transport for Server { server.add_connection(client_id); self.connections.insert(client_id, event.take_connection()); } - steamworks::networking_types::ListenSocketEvent::Disconnected(event) => match event.remote().steam_id() { + ListenSocketEvent::Disconnected(event) => match event.remote().steam_id() { Some(steam_id) => { - // TODO check if there is a better way - let client_id = self.connections_steam_id.get(&steam_id.clone()).unwrap().clone(); - server.remove_connection(client_id); - self.connections_steam_id.remove(&steam_id); - self.connections.remove(&client_id); + if let Some(client_id) = self.connections_steam_id.get(&steam_id.clone()) { + server.remove_connection(*client_id); + self.connections.remove(&client_id); + self.connections_steam_id.remove(&steam_id); + } } None => {} }, - steamworks::networking_types::ListenSocketEvent::Connecting(event) => { + ListenSocketEvent::Connecting(event) => { if server.connected_clients() < self.config.max_clients { let _ = event.accept(); } else { - event.reject(steamworks::networking_types::NetConnectionEnd::AppGeneric, Some("Too many clients")); + event.reject(NetConnectionEnd::AppGeneric, Some("Too many clients")); } } }, _ => {} } + for (id, connection) in self.connections.iter_mut() { + if id == &HOST_CLIENT { + for packet in self.host_queue.iter() { + let _ = server.process_packet_from(&packet, *id); + } + self.host_queue.clear(); + continue; + } + // TODO this allocates on the side of steamworks.rs and should be avoided, PR needed + let messages = connection.receive_messages(MAX_MESSAGE_BATCH_SIZE); + messages.iter().for_each(|message| { + let _ = server.process_packet_from(message.data(), *id); + }); + } } - fn send_packets(&self, server: &mut RenetServer) {} + fn send_packets(&mut self, server: &mut RenetServer) { + for client_id in self.connections.keys() { + let packets = server.get_packets_to_send(*client_id).unwrap(); + if client_id == &HOST_CLIENT { + self.host_queue.extend(packets); + continue; + } + if let Some(connection) = self.connections.get(&client_id) { + Server::::send_packet_to_connection(packets, connection, *client_id); + } + } + } } impl Server { + /// Create a new server + /// # Arguments + /// * `client` - the steamworks client + /// * `config` - the configuration of the server + /// + /// # Panics + /// Panics if the [`init_relay_network_access`](steamworks::networking_utils::NetworkingUtils) was not initialized + /// + /// # Example + /// + /// ``` + /// use steamworks::{Client, ClientManager, networking_utils::RelayNetworkStatus}; + /// use renet_steam_transport::transport::server::{Server, SteamTransportConfig}; + /// + /// let client = Client::init().unwrap(); + /// client.networking_utils().relay_network_status_callback(relay_network_status); + /// client.networking_utils().init_relay_network_access(); + + /// + /// fn relay_network_status(relay: RelayNetworkStatus) { + /// match relay.availability() { + /// RelayNetworkAvailability::Current => { + /// let config = SteamTransportConfig { max_clients: 10 }; + /// let mut server = Server::::new(&client, config); + /// // do something with the server + /// } + /// _ => {} + /// } + /// ``` pub fn new(client: &Client, config: SteamTransportConfig) -> Self { // TODO this must be called at the beginning of the application client.networking_utils().init_relay_network_access(); @@ -76,7 +140,6 @@ impl Server { socket = listen_socket; } Err(handle) => { - // TODO Logging instead of panic panic!("Failed to create listen socket: {:?}", handle); } } @@ -85,9 +148,70 @@ impl Server { config, connections: HashMap::new(), connections_steam_id: HashMap::new(), + host_queue: Vec::new(), + } + } + + pub fn init_host(&mut self, server: &mut RenetServer) { + // one time allocation only required for the host + self.host_queue = Vec::with_capacity(MAX_MESSAGE_BATCH_SIZE); + server.add_connection(HOST_CLIENT); + } + + pub fn is_host_active(&self) -> bool { + self.connections.contains_key(&HOST_CLIENT) + } + + pub fn max_clients(&self) -> usize { + self.config.max_clients + } + + /// Disconnects a client from the server. + pub fn disconnect_client(&mut self, client_id: u64, server: &mut RenetServer, flush_last_packets: bool) { + if let Some((_key, value)) = self.connections.remove_entry(&client_id) { + let _ = value.close(NetConnectionEnd::AppGeneric, Some("Client was kicked"), flush_last_packets); + } + server.remove_connection(client_id); + } + /// Disconnects all active clients including the host client from the server. + pub fn disconnect_all(&mut self, server: &mut RenetServer, flush_last_packets: bool) { + let keys = self.connections.keys().cloned().collect::>(); + for client_id in keys { + if client_id == HOST_CLIENT { + server.remove_connection(client_id); + continue; + } + let _ = self.connections.remove_entry(&client_id).unwrap().1.close( + NetConnectionEnd::AppGeneric, + Some("Client was kicked"), + flush_last_packets, + ); + server.remove_connection(client_id); + } + } + /// while this works fine we should probaly use the send_messages function from the listen_socket + /// TODO to evaluate + fn send_packet_to_connection(packets: Vec>, connection: &NetConnection, client_id: u64) { + for packet in packets { + // TODO send reliable or unreliable depending on the packet + if let Err(error) = connection.send_message(&packet, SendFlags::RELIABLE) { + log::error!("Failed to send packet to client {}: {}", client_id, error); + } + } + if let Err(error) = connection.flush_messages() { + log::warn!("Failed to flush messages for client {}: {}", client_id, error); } } } +// ServerManager implementation + +impl Transport for Server { + fn send_packets(&mut self, _server: &mut RenetServer) {} + + fn update(&mut self, _duration: Duration, _server: &mut RenetServer) {} +} + +impl Server {} // Extensions for the RenetServer trait AutoGeneratedId { From 2801c51a400486dc4ff016346bb29ee0604679cb Mon Sep 17 00:00:00 2001 From: Florian Pabst Date: Sun, 18 Jun 2023 13:33:11 +0200 Subject: [PATCH 05/28] Fix bug where the server would only recognize one event at a time --- renet_steam_transport/src/transport/server.rs | 80 +++++++++++-------- 1 file changed, 45 insertions(+), 35 deletions(-) diff --git a/renet_steam_transport/src/transport/server.rs b/renet_steam_transport/src/transport/server.rs index a46ff9dd..b8bd6f45 100644 --- a/renet_steam_transport/src/transport/server.rs +++ b/renet_steam_transport/src/transport/server.rs @@ -36,40 +36,9 @@ trait Transport { // ClientManager implementation impl Transport for Server { + /// Update should run after client run the callback fn update(&mut self, _duration: Duration, server: &mut RenetServer) { - match self.listen_socket.try_receive_event() { - Some(event) => match event { - ListenSocketEvent::Connected(event) => { - let client_id = server.get_free_id(); - match event.remote().steam_id() { - Some(steam_id) => { - self.connections_steam_id.insert(steam_id, client_id); - } - _ => {} - } - server.add_connection(client_id); - self.connections.insert(client_id, event.take_connection()); - } - ListenSocketEvent::Disconnected(event) => match event.remote().steam_id() { - Some(steam_id) => { - if let Some(client_id) = self.connections_steam_id.get(&steam_id.clone()) { - server.remove_connection(*client_id); - self.connections.remove(&client_id); - self.connections_steam_id.remove(&steam_id); - } - } - None => {} - }, - ListenSocketEvent::Connecting(event) => { - if server.connected_clients() < self.config.max_clients { - let _ = event.accept(); - } else { - event.reject(NetConnectionEnd::AppGeneric, Some("Too many clients")); - } - } - }, - _ => {} - } + self.handle_events(server); for (id, connection) in self.connections.iter_mut() { if id == &HOST_CLIENT { for packet in self.host_queue.iter() { @@ -94,7 +63,7 @@ impl Transport for Server { continue; } if let Some(connection) = self.connections.get(&client_id) { - Server::::send_packet_to_connection(packets, connection, *client_id); + self.send_packet_to_connection(packets, connection, *client_id); } } } @@ -143,6 +112,7 @@ impl Server { panic!("Failed to create listen socket: {:?}", handle); } } + Self { listen_socket: socket, config, @@ -191,7 +161,7 @@ impl Server { } /// while this works fine we should probaly use the send_messages function from the listen_socket /// TODO to evaluate - fn send_packet_to_connection(packets: Vec>, connection: &NetConnection, client_id: u64) { + fn send_packet_to_connection(&self, packets: Vec>, connection: &NetConnection, client_id: u64) { for packet in packets { // TODO send reliable or unreliable depending on the packet if let Err(error) = connection.send_message(&packet, SendFlags::RELIABLE) { @@ -202,6 +172,46 @@ impl Server { log::warn!("Failed to flush messages for client {}: {}", client_id, error); } } + + /// Handle the events of the listen_socket until there are no more events + fn handle_events(&mut self, server: &mut RenetServer) { + let mut has_pending_events: bool = true; + while has_pending_events { + match self.listen_socket.try_receive_event() { + Some(event) => match event { + ListenSocketEvent::Connected(event) => { + let client_id = server.get_free_id(); + match event.remote().steam_id() { + Some(steam_id) => { + self.connections_steam_id.insert(steam_id, client_id); + } + _ => {} + } + server.add_connection(client_id); + self.connections.insert(client_id, event.take_connection()); + } + ListenSocketEvent::Disconnected(event) => match event.remote().steam_id() { + Some(steam_id) => { + if let Some(client_id) = self.connections_steam_id.get(&steam_id.clone()) { + server.remove_connection(*client_id); + self.connections.remove(&client_id); + self.connections_steam_id.remove(&steam_id); + } + } + None => {} + }, + ListenSocketEvent::Connecting(event) => { + if server.connected_clients() < self.config.max_clients { + let _ = event.accept(); + } else { + event.reject(NetConnectionEnd::AppGeneric, Some("Too many clients")); + } + } + }, + None => has_pending_events = false, + } + } + } } // ServerManager implementation From 1d76f53acce0c48ee1105853ef49f901a39a14b6 Mon Sep 17 00:00:00 2001 From: Florian Pabst Date: Sun, 18 Jun 2023 20:11:34 +0200 Subject: [PATCH 06/28] Removed specific host implementation - added client - added optional bevy --- renet_steam_transport/Cargo.toml | 1 + renet_steam_transport/src/lib.rs | 11 +- renet_steam_transport/src/transport/client.rs | 65 ++++++++ renet_steam_transport/src/transport/server.rs | 141 ++++++------------ 4 files changed, 119 insertions(+), 99 deletions(-) diff --git a/renet_steam_transport/Cargo.toml b/renet_steam_transport/Cargo.toml index 70263267..290d0a42 100644 --- a/renet_steam_transport/Cargo.toml +++ b/renet_steam_transport/Cargo.toml @@ -14,3 +14,4 @@ edition = "2021" renet = { path = "../renet" } steamworks = "0.10" log = "0.4.19" +bevy_ecs = { version = "0.10.1", optional = true } diff --git a/renet_steam_transport/src/lib.rs b/renet_steam_transport/src/lib.rs index bb0629da..7ddc8f6c 100644 --- a/renet_steam_transport/src/lib.rs +++ b/renet_steam_transport/src/lib.rs @@ -1,5 +1,14 @@ pub mod transport { - const HOST_CLIENT: u64 = u64::MAX; + use std::time::Duration; + + use renet::{RenetClient, RenetServer}; + + const MAX_MESSAGE_BATCH_SIZE: usize = 255; + + trait Transport { + fn update(&mut self, duration: Duration, handler: &mut T); + fn send_packets(&mut self, handler: &mut T); + } pub mod client; pub mod server; } diff --git a/renet_steam_transport/src/transport/client.rs b/renet_steam_transport/src/transport/client.rs index e69de29b..cedd5e5e 100644 --- a/renet_steam_transport/src/transport/client.rs +++ b/renet_steam_transport/src/transport/client.rs @@ -0,0 +1,65 @@ +use std::time::Duration; + +use renet::RenetClient; +use steamworks::{ + networking_sockets::{InvalidHandle, NetConnection}, + networking_types::{NetworkingIdentity, SendFlags}, + ClientManager, SteamId, +}; + +use super::{Transport, MAX_MESSAGE_BATCH_SIZE}; +#[cfg_attr(feature = "bevy", derive(bevy_ecs::system::Resource))] + +pub struct Client { + connection: NetConnection, +} + +impl Client { + /// Create a new client connection to the server + /// + /// If the connection is not possible, it will return [`InvalidHandle`](steamworks::networking_sockets) + pub fn new(steam_client: &steamworks::Client, steam_id: &SteamId) -> Result { + let options = Vec::new(); + match steam_client + .networking_sockets() + .connect_p2p(NetworkingIdentity::new_steam_id(*steam_id), 0, options) + { + Ok(connection) => Ok(Self { connection }), + Err(h) => Err(h), + } + } + + pub fn client_id(&self, steam_client: &steamworks::Client) -> u64 { + steam_client.user().steam_id().raw() + } + + pub fn disconnect(self, send_last_packets: bool) { + self.connection.close( + steamworks::networking_types::NetConnectionEnd::AppGeneric, + Some("Disconnecting from server"), + send_last_packets, + ); + } +} + +impl Transport for Client { + fn update(&mut self, _duration: Duration, client: &mut RenetClient) { + let messages = self.connection.receive_messages(MAX_MESSAGE_BATCH_SIZE); + messages.iter().for_each(|message| { + client.process_packet(message.data()); + }); + } + + fn send_packets(&mut self, client: &mut RenetClient) { + let packets = client.get_packets_to_send(); + for packet in packets { + if let Err(e) = self.connection.send_message(&packet, SendFlags::UNRELIABLE) { + log::error!("Error while sending packet: {}", e); + } + } + match self.connection.flush_messages() { + Err(e) => log::error!("Error while flushing messages: {}", e), + _ => (), + } + } +} diff --git a/renet_steam_transport/src/transport/server.rs b/renet_steam_transport/src/transport/server.rs index b8bd6f45..71dd2b43 100644 --- a/renet_steam_transport/src/transport/server.rs +++ b/renet_steam_transport/src/transport/server.rs @@ -1,20 +1,20 @@ -const MAX_MESSAGE_BATCH_SIZE: usize = 255; - use std::{collections::HashMap, time::Duration}; use renet::RenetServer; use steamworks::{ - networking_sockets::{ListenSocket, NetConnection}, + networking_sockets::{InvalidHandle, ListenSocket, NetConnection}, networking_types::{ListenSocketEvent, NetConnectionEnd, NetworkingConfigEntry, SendFlags}, - Client, ClientManager, ServerManager, SteamId, + Client, ClientManager, ServerManager, }; -use super::HOST_CLIENT; +use super::{Transport, MAX_MESSAGE_BATCH_SIZE}; +#[cfg_attr(feature = "bevy", derive(bevy_ecs::system::Resource))] pub struct SteamTransportConfig { max_clients: usize, } +#[cfg_attr(feature = "bevy", derive(bevy_ecs::system::Resource))] pub struct Server { /// hold the active socket of the server listen_socket: ListenSocket, @@ -22,35 +22,22 @@ pub struct Server { config: SteamTransportConfig, /// hold the active connections of the server (key is the client id) (value is the connection) connections: HashMap>, - /// is needed to handle the disconnect of a client (key is the steam id) (value is the client id) - connections_steam_id: HashMap, - /// hold the packets for the host client - host_queue: Vec>, -} - -trait Transport { - fn update(&mut self, duration: Duration, server: &mut RenetServer); - fn send_packets(&mut self, server: &mut RenetServer); } // ClientManager implementation -impl Transport for Server { +impl Transport for Server { /// Update should run after client run the callback fn update(&mut self, _duration: Duration, server: &mut RenetServer) { self.handle_events(server); - for (id, connection) in self.connections.iter_mut() { - if id == &HOST_CLIENT { - for packet in self.host_queue.iter() { - let _ = server.process_packet_from(&packet, *id); - } - self.host_queue.clear(); - continue; - } + for (client_id, connection) in self.connections.iter_mut() { // TODO this allocates on the side of steamworks.rs and should be avoided, PR needed let messages = connection.receive_messages(MAX_MESSAGE_BATCH_SIZE); messages.iter().for_each(|message| { - let _ = server.process_packet_from(message.data(), *id); + match server.process_packet_from(message.data(), *client_id) { + Err(e) => log::error!("Error while processing payload for {}: {}", client_id, e), + _ => (), + }; }); } } @@ -58,12 +45,10 @@ impl Transport for Server { fn send_packets(&mut self, server: &mut RenetServer) { for client_id in self.connections.keys() { let packets = server.get_packets_to_send(*client_id).unwrap(); - if client_id == &HOST_CLIENT { - self.host_queue.extend(packets); - continue; - } if let Some(connection) = self.connections.get(&client_id) { - self.send_packet_to_connection(packets, connection, *client_id); + if let Err(e) = self.send_packets_to_connection(packets, connection) { + log::error!("Error while sending packet: {}", e); + } } } } @@ -71,13 +56,11 @@ impl Transport for Server { impl Server { /// Create a new server + /// it will return [`InvalidHandle`](steamworks::networking_sockets) if the server can't be created /// # Arguments /// * `client` - the steamworks client /// * `config` - the configuration of the server /// - /// # Panics - /// Panics if the [`init_relay_network_access`](steamworks::networking_utils::NetworkingUtils) was not initialized - /// /// # Example /// /// ``` @@ -99,39 +82,18 @@ impl Server { /// _ => {} /// } /// ``` - pub fn new(client: &Client, config: SteamTransportConfig) -> Self { - // TODO this must be called at the beginning of the application - client.networking_utils().init_relay_network_access(); + pub fn new(client: &Client, config: SteamTransportConfig) -> Result { let options: Vec = Vec::new(); - let socket; match client.networking_sockets().create_listen_socket_p2p(0, options) { - Ok(listen_socket) => { - socket = listen_socket; - } - Err(handle) => { - panic!("Failed to create listen socket: {:?}", handle); - } - } - - Self { - listen_socket: socket, - config, - connections: HashMap::new(), - connections_steam_id: HashMap::new(), - host_queue: Vec::new(), + Ok(listen_socket) => Ok(Self { + listen_socket, + config, + connections: HashMap::new(), + }), + Err(h) => Err(h), } } - pub fn init_host(&mut self, server: &mut RenetServer) { - // one time allocation only required for the host - self.host_queue = Vec::with_capacity(MAX_MESSAGE_BATCH_SIZE); - server.add_connection(HOST_CLIENT); - } - - pub fn is_host_active(&self) -> bool { - self.connections.contains_key(&HOST_CLIENT) - } - pub fn max_clients(&self) -> usize { self.config.max_clients } @@ -147,10 +109,6 @@ impl Server { pub fn disconnect_all(&mut self, server: &mut RenetServer, flush_last_packets: bool) { let keys = self.connections.keys().cloned().collect::>(); for client_id in keys { - if client_id == HOST_CLIENT { - server.remove_connection(client_id); - continue; - } let _ = self.connections.remove_entry(&client_id).unwrap().1.close( NetConnectionEnd::AppGeneric, Some("Client was kicked"), @@ -161,16 +119,20 @@ impl Server { } /// while this works fine we should probaly use the send_messages function from the listen_socket /// TODO to evaluate - fn send_packet_to_connection(&self, packets: Vec>, connection: &NetConnection, client_id: u64) { + fn send_packets_to_connection( + &self, + packets: Vec>, + connection: &NetConnection, + ) -> Result<(), steamworks::SteamError> { for packet in packets { - // TODO send reliable or unreliable depending on the packet - if let Err(error) = connection.send_message(&packet, SendFlags::RELIABLE) { - log::error!("Failed to send packet to client {}: {}", client_id, error); + if let Err(_e) = connection.send_message(&packet, SendFlags::UNRELIABLE) { + continue; } } - if let Err(error) = connection.flush_messages() { - log::warn!("Failed to flush messages for client {}: {}", client_id, error); + if let Err(e) = connection.flush_messages() { + return Err(e); } + Ok(()) } /// Handle the events of the listen_socket until there are no more events @@ -179,24 +141,19 @@ impl Server { while has_pending_events { match self.listen_socket.try_receive_event() { Some(event) => match event { - ListenSocketEvent::Connected(event) => { - let client_id = server.get_free_id(); - match event.remote().steam_id() { - Some(steam_id) => { - self.connections_steam_id.insert(steam_id, client_id); - } - _ => {} + ListenSocketEvent::Connected(event) => match event.remote().steam_id() { + Some(steam_id) => { + let client_id = steam_id.raw(); + server.add_connection(client_id); + self.connections.insert(client_id, event.take_connection()); } - server.add_connection(client_id); - self.connections.insert(client_id, event.take_connection()); - } + _ => {} + }, ListenSocketEvent::Disconnected(event) => match event.remote().steam_id() { Some(steam_id) => { - if let Some(client_id) = self.connections_steam_id.get(&steam_id.clone()) { - server.remove_connection(*client_id); - self.connections.remove(&client_id); - self.connections_steam_id.remove(&steam_id); - } + let client_id = steam_id.raw(); + server.remove_connection(client_id); + self.connections.remove(&client_id); } None => {} }, @@ -215,22 +172,10 @@ impl Server { } // ServerManager implementation -impl Transport for Server { +impl Transport for Server { fn send_packets(&mut self, _server: &mut RenetServer) {} fn update(&mut self, _duration: Duration, _server: &mut RenetServer) {} } impl Server {} - -// Extensions for the RenetServer -trait AutoGeneratedId { - fn get_free_id(&self) -> u64; -} - -impl AutoGeneratedId for RenetServer { - fn get_free_id(&self) -> u64 { - let id = self.clients_id().len() as u64 + 1; - id - } -} From 8151355f0f1bfb49184953211a2170d39ef34036 Mon Sep 17 00:00:00 2001 From: Florian Pabst Date: Mon, 19 Jun 2023 21:02:35 +0200 Subject: [PATCH 07/28] Added to the bevy_renet project --- bevy_renet/Cargo.toml | 21 +++++-- bevy_renet/src/lib.rs | 3 + bevy_renet/src/steam_transport.rs | 63 +++++++++++++++++++ renet_steam_transport/Cargo.toml | 3 + renet_steam_transport/src/lib.rs | 2 +- renet_steam_transport/src/transport/client.rs | 6 +- renet_steam_transport/src/transport/server.rs | 10 +-- 7 files changed, 95 insertions(+), 13 deletions(-) create mode 100644 bevy_renet/src/steam_transport.rs diff --git a/bevy_renet/Cargo.toml b/bevy_renet/Cargo.toml index f39fff4c..813ce2f9 100644 --- a/bevy_renet/Cargo.toml +++ b/bevy_renet/Cargo.toml @@ -13,13 +13,26 @@ version = "0.0.8" [features] default = ["transport"] transport = ["renet/transport"] +steam_transport = ["renet_steam_transport", "steamworks"] [dependencies] -bevy = {version = "0.10.1", default-features = false} -renet = {path = "../renet", version = "0.0.12", features = ["bevy"]} + + +bevy = { version = "0.10.1", default-features = false } +renet = { path = "../renet", version = "0.0.12", features = ["bevy"] } +renet_steam_transport = { path = "../renet_steam_transport", version = "0.0.1", features = [ + "bevy", +], optional = true } +steamworks = { version = "0.10", optional = true } [dev-dependencies] -bevy = {version = "0.10.1", default-features = false, features = ["bevy_core_pipeline", "bevy_render", "bevy_asset", "bevy_pbr", "x11"]} +bevy = { version = "0.10.1", default-features = false, features = [ + "bevy_core_pipeline", + "bevy_render", + "bevy_asset", + "bevy_pbr", + "x11", +] } bincode = "1.3.1" env_logger = "0.10.0" -serde = {version = "1.0", features = ["derive"]} +serde = { version = "1.0", features = ["derive"] } diff --git a/bevy_renet/src/lib.rs b/bevy_renet/src/lib.rs index ed017716..2351e72f 100644 --- a/bevy_renet/src/lib.rs +++ b/bevy_renet/src/lib.rs @@ -7,6 +7,9 @@ use renet::{RenetClient, RenetServer, ServerEvent}; #[cfg(feature = "transport")] pub mod transport; +#[cfg(feature = "steam_transport")] +pub mod steam_transport; + /// Set for networking systems. #[derive(SystemSet, Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum RenetSet { diff --git a/bevy_renet/src/steam_transport.rs b/bevy_renet/src/steam_transport.rs new file mode 100644 index 00000000..6cf3d8e7 --- /dev/null +++ b/bevy_renet/src/steam_transport.rs @@ -0,0 +1,63 @@ +use bevy::{ + prelude::{resource_exists, App, Condition, CoreSet, IntoSystemConfig, IntoSystemSetConfig, Plugin, Res, ResMut, SystemSet}, + time::Time, +}; +use renet::{RenetClient, RenetServer}; +use renet_steam_transport::transport::{client::SteamClientTransport, server::SteamServerTransport, Transport}; +use steamworks::ClientManager; + +use crate::RenetSet; + +/// Set for networking systems. +#[derive(SystemSet, Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum TransportSet { + Client, + Server, +} +pub struct SteamServerPlugin; + +pub struct SteamClientPlugin; + +impl Plugin for SteamServerPlugin { + fn build(&self, app: &mut App) { + app.configure_set( + TransportSet::Server + .run_if(resource_exists::>().and_then(resource_exists::())) + .after(RenetSet::Server), + ); + app.add_system(Self::update_system.in_base_set(CoreSet::PreUpdate).in_set(TransportSet::Server)); + app.add_system(Self::send_packets.in_base_set(CoreSet::PostUpdate).in_set(TransportSet::Server)); + } +} + +impl SteamServerPlugin { + pub fn update_system(mut transport: ResMut>, mut server: ResMut, time: Res