Skip to content
This repository has been archived by the owner on Apr 16, 2022. It is now read-only.

update naia-socket deps to 0.8 #52

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use-webrtc = [
[dependencies]
bevy = { version = "0.6", default-features = false }
turbulence = "0.3"
naia-client-socket = { version = "0.6", features = ["multithread"] }
naia-client-socket = "0.8"
naia-socket-shared = "0.8"
bytes = "1.1"
futures-lite = "1.12"
crossbeam-channel = "0.5"
Expand All @@ -39,7 +40,7 @@ futures = "0.3"
futures-timer = "3.0"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
naia-server-socket = "0.5"
naia-server-socket = "0.8"

[dev-dependencies]
clap = "2.34.0"
Expand Down
2 changes: 1 addition & 1 deletion examples/idle_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fn startup(mut net: ResMut<NetworkResource>, args: Res<Args>) {
}
if !args.is_server {
info!("Starting client");
net.connect(server_address);
net.connect(&format!("http://{}", server_address.to_string()));
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/message_coalescing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ fn startup(mut net: ResMut<NetworkResource>, args: Res<Args>) {
}
if !args.is_server {
info!("Starting client");
net.connect(server_address);
net.connect(&format!("http://{}", server_address.to_string()));
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn startup(mut net: ResMut<NetworkResource>, args: Res<Args>) {
}
if !args.is_server {
info!("Starting client");
net.connect(server_address);
net.connect(&format!("http://{}", server_address.to_string()));
}
}

Expand Down
65 changes: 37 additions & 28 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use bevy::{
};
#[cfg(not(target_arch = "wasm32"))]
use crossbeam_channel::{unbounded, SendError as CrossbeamSendError, Sender};
use naia_socket_shared::SocketConfig;
#[cfg(not(target_arch = "wasm32"))]
use std::sync::RwLock;
use std::{
Expand All @@ -18,13 +19,13 @@ use std::{
sync::{atomic, Arc, Mutex},
};

use naia_client_socket::ClientSocket;
use naia_client_socket::Socket as ClientSocket;
#[cfg(not(target_arch = "wasm32"))]
use naia_server_socket::ServerSocket;
use naia_server_socket::Socket as ServerSocket;

pub use naia_client_socket::LinkConditionerConfig;
#[cfg(not(target_arch = "wasm32"))]
pub use naia_server_socket::find_my_ip_address;
pub use naia_socket_shared::find_my_ip_address;
pub use naia_socket_shared::LinkConditionerConfig;

use turbulence::{
buffer::BufferPacketPool,
Expand Down Expand Up @@ -223,36 +224,39 @@ impl NetworkResource {
&mut self,
socket_address: SocketAddr,
webrtc_listen_address: Option<SocketAddr>,
public_webrtc_address: Option<SocketAddr>,
public_webrtc_url: Option<&str>,
) {
let mut server_socket = {
use naia_server_socket::ServerAddrs;

let server_socket = {
let webrtc_listen_address = webrtc_listen_address.unwrap_or_else(|| {
let mut listen_addr = socket_address;
listen_addr.set_port(socket_address.port() + 1);
listen_addr
});
let public_webrtc_address = public_webrtc_address.unwrap_or(webrtc_listen_address);
let socket = futures_lite::future::block_on(ServerSocket::listen(

let mut socket = ServerSocket::new(SocketConfig {
link_condition_config: self.link_conditioner.clone(),
..Default::default()
});
socket.listen(ServerAddrs::new(
socket_address,
webrtc_listen_address,
public_webrtc_address,
public_webrtc_url.unwrap_or(&format!("http://{}", webrtc_listen_address)),
));

if let Some(ref conditioner) = self.link_conditioner {
socket.with_link_conditioner(conditioner)
} else {
socket
}
socket
};

let server_channels = self.server_channels.clone();
let pending_connections = self.pending_connections.clone();
let task_pool = self.task_pool.clone();

self.listeners.push(self.task_pool.spawn(async move {
let mut receiver = server_socket.packet_receiver();
loop {
match server_socket.receive().await {
Ok(packet) => {
match receiver.receive() {
Ok(Some(packet)) => {
let address = packet.address();
let message = String::from_utf8_lossy(packet.payload());
debug!(
Expand Down Expand Up @@ -299,7 +303,7 @@ impl NetworkResource {
transport::ServerConnection::new(
task_pool.clone(),
packet_rx,
server_socket.get_sender(),
server_socket.packet_sender(),
address,
),
));
Expand All @@ -313,6 +317,7 @@ impl NetworkResource {
}
}
}
Ok(None) => (),
Err(error) => {
error!("Server Receive Error: {}", error);
}
Expand All @@ -321,17 +326,18 @@ impl NetworkResource {
}));
}

pub fn connect(&mut self, socket_address: SocketAddr) {
let mut client_socket = {
let socket = ClientSocket::connect(socket_address);
pub fn connect(&mut self, server_session_url: &str) {
let client_socket = {
let mut socket = ClientSocket::new(SocketConfig {
link_condition_config: self.link_conditioner.clone(),
..Default::default()
});

if let Some(ref conditioner) = self.link_conditioner {
socket.with_link_conditioner(conditioner)
} else {
socket
}
socket.connect(server_session_url);
socket
};
let sender = client_socket.get_sender();

let sender = client_socket.packet_sender();

self.pending_connections
.lock()
Expand Down Expand Up @@ -367,7 +373,10 @@ impl NetworkResource {
payload: Packet,
) -> Result<(), Box<dyn Error + Sync + Send + 'static>> {
match self.connections.get_mut(&handle) {
Some(connection) => connection.send(payload),
Some(connection) => {
connection.send(payload);
Ok(())
}
None => Err(Box::new(std::io::Error::new(
// FIXME: move to enum Error
std::io::ErrorKind::NotFound,
Expand All @@ -378,7 +387,7 @@ impl NetworkResource {

pub fn broadcast(&mut self, payload: Packet) {
for (_handle, connection) in self.connections.iter_mut() {
connection.send(payload.clone()).unwrap();
connection.send(payload.clone());
}
}

Expand Down
47 changes: 19 additions & 28 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ use bevy::{prelude::error, tasks::TaskPool};
use bytes::Bytes;
use instant::{Duration, Instant};
use std::{
error::Error,
net::SocketAddr,
sync::{Arc, RwLock},
};

use naia_client_socket::{
ClientSocketTrait, MessageSender as ClientSender, Packet as ClientPacket,
Packet as ClientPacket, PacketSender as ClientSender, Socket as ClientSocket,
};
#[cfg(not(target_arch = "wasm32"))]
use naia_server_socket::{MessageSender as ServerSender, Packet as ServerPacket};
use naia_server_socket::{
Packet as ServerPacket, PacketSender as ServerSender
};

use turbulence::{
buffer::BufferPacketPool,
Expand All @@ -22,9 +23,6 @@ use turbulence::{
packet_multiplexer::{IncomingMultiplexedPackets, MuxPacket, MuxPacketPool, PacketMultiplexer},
};

#[cfg(not(target_arch = "wasm32"))]
use futures_lite::future::block_on;

use futures_lite::StreamExt;

use super::{
Expand Down Expand Up @@ -87,7 +85,7 @@ impl PacketStats {
pub trait Connection: Send + Sync {
fn remote_address(&self) -> Option<SocketAddr>;

fn send(&mut self, payload: Packet) -> Result<(), Box<dyn Error + Sync + Send>>;
fn send(&mut self, payload: Packet);

fn receive(&mut self) -> Option<Result<Packet, NetworkError>>;

Expand Down Expand Up @@ -154,17 +152,16 @@ impl Connection for ServerConnection {
self.stats.read().expect("stats lock poisoned").clone()
}

fn send(&mut self, payload: Packet) -> Result<(), Box<dyn Error + Sync + Send>> {
fn send(&mut self, payload: Packet) {
self.stats
.write()
.expect("stats lock poisoned")
.add_tx(payload.len());
block_on(
self.sender
.as_mut()
.unwrap()
.send(ServerPacket::new(self.client_address, payload.to_vec())),
)

self.sender
.as_mut()
.unwrap()
.send(ServerPacket::new(self.client_address, payload.to_vec()))
}

fn last_packet_timings(&self) -> (u128, u128) {
Expand Down Expand Up @@ -211,7 +208,7 @@ impl Connection for ServerConnection {
let (channels_rx, mut channels_tx) = multiplexer.start();
self.channels_rx = Some(channels_rx);

let mut sender = self.sender.take().unwrap();
let sender = self.sender.take().unwrap();
let client_address = self.client_address;
let stats = self.stats.clone();

Expand All @@ -223,9 +220,7 @@ impl Connection for ServerConnection {
.expect("stats lock poisoned")
.add_tx(packet.len());
sender
.send(ServerPacket::new(client_address, (*packet).into()))
.await
.unwrap();
.send(ServerPacket::new(client_address, (*packet).into()));
}
}));
}
Expand All @@ -242,7 +237,7 @@ impl Connection for ServerConnection {
pub struct ClientConnection {
task_pool: TaskPool,

socket: Box<dyn ClientSocketTrait>,
socket: ClientSocket,
sender: Option<ClientSender>,
stats: Arc<RwLock<PacketStats>>,

Expand All @@ -253,11 +248,7 @@ pub struct ClientConnection {
}

impl ClientConnection {
pub fn new(
task_pool: TaskPool,
socket: Box<dyn ClientSocketTrait>,
sender: ClientSender,
) -> Self {
pub fn new(task_pool: TaskPool, socket: ClientSocket, sender: ClientSender) -> Self {
ClientConnection {
task_pool,
socket,
Expand Down Expand Up @@ -289,19 +280,19 @@ impl Connection for ClientConnection {
(rx_dur.as_millis(), tx_dur.as_millis())
}

fn send(&mut self, payload: Packet) -> Result<(), Box<dyn Error + Sync + Send>> {
fn send(&mut self, payload: Packet) {
self.stats
.write()
.expect("stats lock poisoned")
.add_tx(payload.len());
self.sender
.as_mut()
.unwrap()
.send(ClientPacket::new(payload.to_vec()))
.send(ClientPacket::new(payload.to_vec()));
}

fn receive(&mut self) -> Option<Result<Packet, NetworkError>> {
match self.socket.receive() {
match self.socket.packet_receiver().receive() {
Ok(event) => event.map(|packet| {
self.stats
.write()
Expand Down Expand Up @@ -338,7 +329,7 @@ impl Connection for ClientConnection {
.write()
.expect("stats lock poisoned")
.add_tx(packet.len());
sender.send(ClientPacket::new((*packet).into())).unwrap();
sender.send(ClientPacket::new((*packet).into()));
}
None => {
error!("Channel stream Disconnected");
Expand Down