From 144f8752318cb1a2854d5b3a769b84ee9fac8eda Mon Sep 17 00:00:00 2001 From: ngthhu Date: Fri, 26 Apr 2024 14:46:01 +0700 Subject: [PATCH] refactor: [#681] udp return errors instead of panicking --- src/console/clients/udp/checker.rs | 16 +- src/shared/bit_torrent/tracker/udp/client.rs | 214 +++++++++++-------- 2 files changed, 130 insertions(+), 100 deletions(-) diff --git a/src/console/clients/udp/checker.rs b/src/console/clients/udp/checker.rs index 12b8d764c..9b2a9011e 100644 --- a/src/console/clients/udp/checker.rs +++ b/src/console/clients/udp/checker.rs @@ -64,7 +64,7 @@ impl Client { let binding_address = local_bind_to.parse().context("binding local address")?; debug!("Binding to: {local_bind_to}"); - let udp_client = UdpClient::bind(&local_bind_to).await; + let udp_client = UdpClient::bind(&local_bind_to).await?; let bound_to = udp_client.socket.local_addr().context("bound local address")?; debug!("Bound to: {bound_to}"); @@ -88,7 +88,7 @@ impl Client { match &self.udp_tracker_client { Some(client) => { - client.udp_client.connect(&tracker_socket_addr.to_string()).await; + client.udp_client.connect(&tracker_socket_addr.to_string()).await?; self.remote_socket = Some(*tracker_socket_addr); Ok(()) } @@ -116,9 +116,9 @@ impl Client { match &self.udp_tracker_client { Some(client) => { - client.send(connect_request.into()).await; + client.send(connect_request.into()).await?; - let response = client.receive().await; + let response = client.receive().await?; debug!("connection request response:\n{response:#?}"); @@ -163,9 +163,9 @@ impl Client { match &self.udp_tracker_client { Some(client) => { - client.send(announce_request.into()).await; + client.send(announce_request.into()).await?; - let response = client.receive().await; + let response = client.receive().await?; debug!("announce request response:\n{response:#?}"); @@ -200,9 +200,9 @@ impl Client { match &self.udp_tracker_client { Some(client) => { - client.send(scrape_request.into()).await; + client.send(scrape_request.into()).await?; - let response = client.receive().await; + let response = client.receive().await?; debug!("scrape request response:\n{response:#?}"); diff --git a/src/shared/bit_torrent/tracker/udp/client.rs b/src/shared/bit_torrent/tracker/udp/client.rs index 3df2a3da2..d0f7c0fac 100644 --- a/src/shared/bit_torrent/tracker/udp/client.rs +++ b/src/shared/bit_torrent/tracker/udp/client.rs @@ -1,18 +1,17 @@ +use core::result::Result::{Err, Ok}; use std::io::Cursor; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use crate::shared::bit_torrent::tracker::udp::{source_address, MAX_PACKET_SIZE}; -use anyhow::anyhow; -use anyhow::{Context, Result}; -use core::result::Result::{Ok, Err}; -use anyhow::Error as AError; +use anyhow::{anyhow, Context, Result}; use aquatic_udp_protocol::{ConnectRequest, Request, Response, TransactionId}; use log::debug; use tokio::net::UdpSocket; use tokio::time; +use crate::shared::bit_torrent::tracker::udp::{source_address, MAX_PACKET_SIZE}; + /// Default timeout for sending and receiving packets. And waiting for sockets /// to be readable and writable. const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); @@ -28,13 +27,17 @@ pub struct UdpClient { } impl UdpClient { - /// # Panics + /// # Errors /// - /// Will panic if the local address can't be bound. + /// Will return error if the local address can't be bound. /// pub async fn bind(local_address: &str) -> Result { - let socket_addr = local_address.parse::().map_err(|err| err).context("{local_address} is not a valid socket address")?; + let socket_addr = local_address + .parse::() + .context(format!("{local_address} is not a valid socket address"))?; + let socket = UdpSocket::bind(socket_addr).await?; + let udp_client = Self { socket: Arc::new(socket), timeout: DEFAULT_TIMEOUT, @@ -42,69 +45,74 @@ impl UdpClient { Ok(udp_client) } - /// # Panics + /// # Errors /// - /// Will panic if can't connect to the socket. + /// Will return error if can't connect to the socket. pub async fn connect(&self, remote_address: &str) -> anyhow::Result<()> { - let socket_addr = remote_address.parse::().map_err(|err| err).context(format!("{} is not a valid socket address", remote_address))?; - self.socket.connect(socket_addr).await.map_err(|err| err)?; - Ok(()) + let socket_addr = remote_address + .parse::() + .context(format!("{remote_address} is not a valid socket address"))?; + + match self.socket.connect(socket_addr).await { + Ok(()) => { + debug!("Connected successfully"); + Ok(()) + } + Err(e) => Err(anyhow!("Failed to connect: {e:?}")), + } } - /// # Panics + /// # Errors /// - /// Will panic if: + /// Will return error if: /// /// - Can't write to the socket. /// - Can't send data. pub async fn send(&self, bytes: &[u8]) -> Result { debug!(target: "UDP client", "sending {bytes:?} ..."); - let _:Result<(), anyhow::Error> = match time::timeout(self.timeout, self.socket.writable()).await { + match time::timeout(self.timeout, self.socket.writable()).await { Ok(writable_result) => { - let writable_result_status : Result<(), anyhow::Error> = match writable_result { - Ok(()) => Ok(()), - Err(e) => Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")) + match writable_result { + Ok(()) => (), + Err(e) => return Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")), }; - writable_result_status } - Err(e) => Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")) + Err(e) => return Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")), }; - let send_status:Result = match time::timeout(self.timeout, self.socket.send(bytes)).await { - Ok(send_result) => { - let send_result_status: Result = match send_result { - Ok(size) => Ok(size), - Err(e) => Err(anyhow!("IO error waiting for the socket to become readable: {}", e)) - }; - send_result_status - } - Err(e) => Err(anyhow!("IO error waiting for the socket to become readable: {}", e)) - }; - send_status + match time::timeout(self.timeout, self.socket.send(bytes)).await { + Ok(send_result) => match send_result { + Ok(size) => Ok(size), + Err(e) => Err(anyhow!("IO error during send: {e:?}")), + }, + Err(e) => Err(anyhow!("Send operation timed out: {e:?}")), + } } - /// # Panics + /// # Errors /// - /// Will panic if: + /// Will return error if: /// /// - Can't read from the socket. /// - Can't receive data. + /// + /// # Panics + /// pub async fn receive(&self, bytes: &mut [u8]) -> Result { debug!(target: "UDP client", "receiving ..."); - let _ :Result<(), anyhow::Error>= match time::timeout(self.timeout, self.socket.readable()).await { + match time::timeout(self.timeout, self.socket.readable()).await { Ok(readable_result) => { - let readable_result_status = match readable_result { - Ok(()) => Ok(()), - Err(e) => Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")), + match readable_result { + Ok(()) => (), + Err(e) => return Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")), }; - readable_result_status - }, - Err(e) => Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")), + } + Err(e) => return Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")), }; - let size: Result = match time::timeout(self.timeout, self.socket.recv(bytes)).await { + let size_result = match time::timeout(self.timeout, self.socket.recv(bytes)).await { Ok(recv_result) => match recv_result { Ok(size) => Ok(size), Err(e) => Err(anyhow!("IO error during send: {e:?}")), @@ -112,23 +120,27 @@ impl UdpClient { Err(e) => Err(anyhow!("Receive operation timed out: {e:?}")), }; - debug!(target: "UDP client", "{size} bytes received {bytes:?}"); - - size + if size_result.is_ok() { + let size = size_result.as_ref().unwrap(); + debug!(target: "UDP client", "{size} bytes received {bytes:?}"); + size_result + } else { + size_result + } } - +} /// Creates a new `UdpClient` connected to a Udp server +/// +/// # Errors +/// +/// Will return any errors present in the call stack +/// pub async fn new_udp_client_connected(remote_address: &str) -> Result { let port = 0; // Let OS choose an unused port. - match UdpClient::bind(&source_address(port)).await { - Ok(client) => { - client.connect(remote_address).await; - Ok(client) - } - Err(err) => Err(err), - } -} + let client = UdpClient::bind(&source_address(port)).await?; + client.connect(remote_address).await?; + Ok(client) } #[allow(clippy::module_name_repetitions)] @@ -138,9 +150,9 @@ pub struct UdpTrackerClient { } impl UdpTrackerClient { - /// # Panics + /// # Errors /// - /// Will panic if can't write request to bytes. + /// Will return error if can't write request to bytes. pub async fn send(&self, request: Request) -> Result { debug!(target: "UDP tracker client", "send request {request:?}"); @@ -148,75 +160,93 @@ impl UdpTrackerClient { let request_buffer = vec![0u8; MAX_PACKET_SIZE]; let mut cursor = Cursor::new(request_buffer); - let request_data = match request.write(&mut cursor) { + let request_data_result = match request.write(&mut cursor) { Ok(()) => { #[allow(clippy::cast_possible_truncation)] let position = cursor.position() as usize; let inner_request_buffer = cursor.get_ref(); // Return slice which contains written request data - &inner_request_buffer[..position] + Ok(&inner_request_buffer[..position]) } Err(e) => Err(anyhow!("could not write request to bytes: {e}.")), }; + let request_data = request_data_result?; + self.udp_client.send(request_data).await } - /// # Panics + /// # Errors /// - /// Will panic if can't create response from the received payload (bytes buffer). - pub async fn receive(&self) -> Response { + /// Will return error if can't create response from the received payload (bytes buffer). + pub async fn receive(&self) -> Result { let mut response_buffer = [0u8; MAX_PACKET_SIZE]; - let payload_size = self.udp_client.receive(&mut response_buffer).await; + let payload_size = self.udp_client.receive(&mut response_buffer).await?; debug!(target: "UDP tracker client", "received {payload_size} bytes. Response {response_buffer:?}"); - Response::from_bytes(&response_buffer[..payload_size], true).unwrap() + let response = Response::from_bytes(&response_buffer[..payload_size], true)?; + + Ok(response) } } /// Creates a new `UdpTrackerClient` connected to a Udp Tracker server -pub async fn new_udp_tracker_client_connected(remote_address: &str) -> UdpTrackerClient { +/// +/// # Errors +/// +/// Will return any errors present in the call stack +/// +pub async fn new_udp_tracker_client_connected(remote_address: &str) -> Result { let udp_client = new_udp_client_connected(remote_address).await?; - UdpTrackerClient { udp_client.unwrap() } + let udp_tracker_client = UdpTrackerClient { udp_client }; + Ok(udp_tracker_client) } /// Helper Function to Check if a UDP Service is Connectable /// -/// # Errors +/// # Panics /// /// It will return an error if unable to connect to the UDP service. /// -/// # Panics +/// # Errors +/// pub async fn check(binding: &SocketAddr) -> Result { debug!("Checking Service (detail): {binding:?}."); - let client = new_udp_tracker_client_connected(binding.to_string().as_str()).await; - - let connect_request = ConnectRequest { - transaction_id: TransactionId(123), - }; - - client.send(connect_request.into()).await; - - let process = move |response| { - if matches!(response, Response::Connect(_connect_response)) { - Ok("Connected".to_string()) - } else { - Error("Did not Connect".to_string()) - } - }; - - let sleep = time::sleep(Duration::from_millis(2000)); - tokio::pin!(sleep); - - tokio::select! { - () = &mut sleep => { - Error("Timed Out".to_string()) - } - response = client.receive() => { - process(response) + match new_udp_tracker_client_connected(binding.to_string().as_str()).await { + Ok(client) => { + let connect_request = ConnectRequest { + transaction_id: TransactionId(123), + }; + + // client.send() return usize, but doesn't use here + match client.send(connect_request.into()).await { + Ok(_) => (), + Err(e) => debug!("Error: {e:?}."), + }; + + let process = move |response| { + if matches!(response, Response::Connect(_connect_response)) { + Ok("Connected".to_string()) + } else { + Err("Did not Connect".to_string()) + } + }; + + let sleep = time::sleep(Duration::from_millis(2000)); + tokio::pin!(sleep); + + tokio::select! { + () = &mut sleep => { + Err("Timed Out".to_string()) + } + response = client.receive() => { + process(response.unwrap()) + } + } } + Err(e) => Err(format!("{e:?}")), } }