Skip to content

Commit

Permalink
refactor: [#681] udp return errors instead of panicking
Browse files Browse the repository at this point in the history
  • Loading branch information
hungfnt committed May 2, 2024
1 parent 92349d3 commit effca56
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 109 deletions.
16 changes: 8 additions & 8 deletions src/console/clients/udp/checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Client {
let binding_address = local_bind_to.parse().context("binding local address")?;

debug!("Binding to: {local_bind_to}");
let udp_client = UdpClient::bind(&local_bind_to).await;
let udp_client = UdpClient::bind(&local_bind_to).await?;

let bound_to = udp_client.socket.local_addr().context("bound local address")?;
debug!("Bound to: {bound_to}");
Expand All @@ -88,7 +88,7 @@ impl Client {

match &self.udp_tracker_client {
Some(client) => {
client.udp_client.connect(&tracker_socket_addr.to_string()).await;
client.udp_client.connect(&tracker_socket_addr.to_string()).await?;
self.remote_socket = Some(*tracker_socket_addr);
Ok(())
}
Expand Down Expand Up @@ -116,9 +116,9 @@ impl Client {

match &self.udp_tracker_client {
Some(client) => {
client.send(connect_request.into()).await;
client.send(connect_request.into()).await?;

let response = client.receive().await;
let response = client.receive().await?;

debug!("connection request response:\n{response:#?}");

Expand Down Expand Up @@ -163,9 +163,9 @@ impl Client {

match &self.udp_tracker_client {
Some(client) => {
client.send(announce_request.into()).await;
client.send(announce_request.into()).await?;

let response = client.receive().await;
let response = client.receive().await?;

debug!("announce request response:\n{response:#?}");

Expand Down Expand Up @@ -200,9 +200,9 @@ impl Client {

match &self.udp_tracker_client {
Some(client) => {
client.send(scrape_request.into()).await;
client.send(scrape_request.into()).await?;

let response = client.receive().await;
let response = client.receive().await?;

debug!("scrape request response:\n{response:#?}");

Expand Down
215 changes: 128 additions & 87 deletions src/shared/bit_torrent/tracker/udp/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use core::result::Result::{Err, Ok};
use std::io::Cursor;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Context, Result};
use aquatic_udp_protocol::{ConnectRequest, Request, Response, TransactionId};
use log::debug;
use tokio::net::UdpSocket;
Expand All @@ -25,99 +27,120 @@ pub struct UdpClient {
}

impl UdpClient {
/// # Panics
/// # Errors
///
/// Will return error if the local address can't be bound.
///
/// Will panic if the local address can't be bound.
pub async fn bind(local_address: &str) -> Self {
let valid_socket_addr = local_address
pub async fn bind(local_address: &str) -> Result<Self> {
let socket_addr = local_address
.parse::<SocketAddr>()
.unwrap_or_else(|_| panic!("{local_address} is not a valid socket address"));
.context(format!("{local_address} is not a valid socket address"))?;

let socket = UdpSocket::bind(valid_socket_addr).await.unwrap();
let socket = UdpSocket::bind(socket_addr).await?;

Self {
let udp_client = Self {
socket: Arc::new(socket),
timeout: DEFAULT_TIMEOUT,
}
};
Ok(udp_client)
}

/// # Panics
/// # Errors
///
/// Will panic if can't connect to the socket.
pub async fn connect(&self, remote_address: &str) {
let valid_socket_addr = remote_address
/// Will return error if can't connect to the socket.
pub async fn connect(&self, remote_address: &str) -> Result<()> {
let socket_addr = remote_address
.parse::<SocketAddr>()
.unwrap_or_else(|_| panic!("{remote_address} is not a valid socket address"));
.context(format!("{remote_address} is not a valid socket address"))?;

match self.socket.connect(valid_socket_addr).await {
Ok(()) => debug!("Connected successfully"),
Err(e) => panic!("Failed to connect: {e:?}"),
match self.socket.connect(socket_addr).await {
Ok(()) => {
debug!("Connected successfully");
Ok(())
}
Err(e) => Err(anyhow!("Failed to connect: {e:?}")),

Check warning on line 61 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L61

Added line #L61 was not covered by tests
}
}

/// # Panics
/// # Errors
///
/// Will panic if:
/// Will return error if:
///
/// - Can't write to the socket.
/// - Can't send data.
pub async fn send(&self, bytes: &[u8]) -> usize {
pub async fn send(&self, bytes: &[u8]) -> Result<usize> {
debug!(target: "UDP client", "sending {bytes:?} ...");

match time::timeout(self.timeout, self.socket.writable()).await {
Ok(writable_result) => match writable_result {
Ok(()) => (),
Err(e) => panic!("{}", format!("IO error waiting for the socket to become readable: {e:?}")),
},
Err(e) => panic!("{}", format!("Timeout waiting for the socket to become readable: {e:?}")),
Ok(writable_result) => {
match writable_result {
Ok(()) => (),
Err(e) => return Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")),

Check warning on line 78 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L78

Added line #L78 was not covered by tests
};
}
Err(e) => return Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")),

Check warning on line 81 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L81

Added line #L81 was not covered by tests
};

match time::timeout(self.timeout, self.socket.send(bytes)).await {
Ok(send_result) => match send_result {
Ok(size) => size,
Err(e) => panic!("{}", format!("IO error during send: {e:?}")),
Ok(size) => Ok(size),
Err(e) => Err(anyhow!("IO error during send: {e:?}")),

Check warning on line 87 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L87

Added line #L87 was not covered by tests
},
Err(e) => panic!("{}", format!("Send operation timed out: {e:?}")),
Err(e) => Err(anyhow!("Send operation timed out: {e:?}")),

Check warning on line 89 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L89

Added line #L89 was not covered by tests
}
}

/// # Panics
/// # Errors
///
/// Will panic if:
/// Will return error if:
///
/// - Can't read from the socket.
/// - Can't receive data.
pub async fn receive(&self, bytes: &mut [u8]) -> usize {
///
/// # Panics
///
pub async fn receive(&self, bytes: &mut [u8]) -> Result<usize> {
debug!(target: "UDP client", "receiving ...");

match time::timeout(self.timeout, self.socket.readable()).await {
Ok(readable_result) => match readable_result {
Ok(()) => (),
Err(e) => panic!("{}", format!("IO error waiting for the socket to become readable: {e:?}")),
},
Err(e) => panic!("{}", format!("Timeout waiting for the socket to become readable: {e:?}")),
Ok(readable_result) => {
match readable_result {
Ok(()) => (),
Err(e) => return Err(anyhow!("IO error waiting for the socket to become readable: {e:?}")),

Check warning on line 109 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L109

Added line #L109 was not covered by tests
};
}
Err(e) => return Err(anyhow!("Timeout waiting for the socket to become readable: {e:?}")),

Check warning on line 112 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L112

Added line #L112 was not covered by tests
};

let size = match time::timeout(self.timeout, self.socket.recv(bytes)).await {
let size_result = match time::timeout(self.timeout, self.socket.recv(bytes)).await {
Ok(recv_result) => match recv_result {
Ok(size) => size,
Err(e) => panic!("{}", format!("IO error during send: {e:?}")),
Ok(size) => Ok(size),
Err(e) => Err(anyhow!("IO error during send: {e:?}")),

Check warning on line 118 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L118

Added line #L118 was not covered by tests
},
Err(e) => panic!("{}", format!("Receive operation timed out: {e:?}")),
Err(e) => Err(anyhow!("Receive operation timed out: {e:?}")),

Check warning on line 120 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L120

Added line #L120 was not covered by tests
};

debug!(target: "UDP client", "{size} bytes received {bytes:?}");

size
if size_result.is_ok() {
let size = size_result.as_ref().unwrap();
debug!(target: "UDP client", "{size} bytes received {bytes:?}");
size_result
} else {
size_result

Check warning on line 128 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L128

Added line #L128 was not covered by tests
}
}
}

/// Creates a new `UdpClient` connected to a Udp server
pub async fn new_udp_client_connected(remote_address: &str) -> UdpClient {
///
/// # Errors
///
/// Will return any errors present in the call stack
///
pub async fn new_udp_client_connected(remote_address: &str) -> Result<UdpClient> {
let port = 0; // Let OS choose an unused port.
let client = UdpClient::bind(&source_address(port)).await;
client.connect(remote_address).await;
client
let client = UdpClient::bind(&source_address(port)).await?;
client.connect(remote_address).await?;
Ok(client)
}

#[allow(clippy::module_name_repetitions)]
Expand All @@ -127,85 +150,103 @@ pub struct UdpTrackerClient {
}

impl UdpTrackerClient {
/// # Panics
/// # Errors
///
/// Will panic if can't write request to bytes.
pub async fn send(&self, request: Request) -> usize {
/// Will return error if can't write request to bytes.
pub async fn send(&self, request: Request) -> Result<usize> {
debug!(target: "UDP tracker client", "send request {request:?}");

// Write request into a buffer
let request_buffer = vec![0u8; MAX_PACKET_SIZE];
let mut cursor = Cursor::new(request_buffer);

let request_data = match request.write(&mut cursor) {
let request_data_result = match request.write(&mut cursor) {
Ok(()) => {
#[allow(clippy::cast_possible_truncation)]
let position = cursor.position() as usize;
let inner_request_buffer = cursor.get_ref();
// Return slice which contains written request data
&inner_request_buffer[..position]
Ok(&inner_request_buffer[..position])
}
Err(e) => panic!("could not write request to bytes: {e}."),
Err(e) => Err(anyhow!("could not write request to bytes: {e}.")),

Check warning on line 171 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L171

Added line #L171 was not covered by tests
};

let request_data = request_data_result?;

self.udp_client.send(request_data).await
}

/// # Panics
/// # Errors
///
/// Will panic if can't create response from the received payload (bytes buffer).
pub async fn receive(&self) -> Response {
/// Will return error if can't create response from the received payload (bytes buffer).
pub async fn receive(&self) -> Result<Response> {
let mut response_buffer = [0u8; MAX_PACKET_SIZE];

let payload_size = self.udp_client.receive(&mut response_buffer).await;
let payload_size = self.udp_client.receive(&mut response_buffer).await?;

debug!(target: "UDP tracker client", "received {payload_size} bytes. Response {response_buffer:?}");

Response::from_bytes(&response_buffer[..payload_size], true).unwrap()
let response = Response::from_bytes(&response_buffer[..payload_size], true)?;

Ok(response)
}
}

/// Creates a new `UdpTrackerClient` connected to a Udp Tracker server
pub async fn new_udp_tracker_client_connected(remote_address: &str) -> UdpTrackerClient {
let udp_client = new_udp_client_connected(remote_address).await;
UdpTrackerClient { udp_client }
///
/// # Errors
///
/// Will return any errors present in the call stack
///
pub async fn new_udp_tracker_client_connected(remote_address: &str) -> Result<UdpTrackerClient> {
let udp_client = new_udp_client_connected(remote_address).await?;
let udp_tracker_client = UdpTrackerClient { udp_client };
Ok(udp_tracker_client)
}

/// Helper Function to Check if a UDP Service is Connectable
///
/// # Errors
/// # Panics
///
/// It will return an error if unable to connect to the UDP service.
///
/// # Panics
/// # Errors
///
pub async fn check(binding: &SocketAddr) -> Result<String, String> {
debug!("Checking Service (detail): {binding:?}.");

let client = new_udp_tracker_client_connected(binding.to_string().as_str()).await;

let connect_request = ConnectRequest {
transaction_id: TransactionId(123),
};

client.send(connect_request.into()).await;

let process = move |response| {
if matches!(response, Response::Connect(_connect_response)) {
Ok("Connected".to_string())
} else {
Err("Did not Connect".to_string())
}
};

let sleep = time::sleep(Duration::from_millis(2000));
tokio::pin!(sleep);

tokio::select! {
() = &mut sleep => {
Err("Timed Out".to_string())
}
response = client.receive() => {
process(response)
match new_udp_tracker_client_connected(binding.to_string().as_str()).await {
Ok(client) => {
let connect_request = ConnectRequest {
transaction_id: TransactionId(123),
};

// client.send() return usize, but doesn't use here
match client.send(connect_request.into()).await {
Ok(_) => (),
Err(e) => debug!("Error: {e:?}."),

Check warning on line 227 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L227

Added line #L227 was not covered by tests
};

let process = move |response| {
if matches!(response, Response::Connect(_connect_response)) {
Ok("Connected".to_string())
} else {
Err("Did not Connect".to_string())

Check warning on line 234 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L234

Added line #L234 was not covered by tests
}
};

let sleep = time::sleep(Duration::from_millis(2000));
tokio::pin!(sleep);

tokio::select! {
() = &mut sleep => {
Err("Timed Out".to_string())
}
response = client.receive() => {
process(response.unwrap())
}
}
}
Err(e) => Err(format!("{e:?}")),

Check warning on line 250 in src/shared/bit_torrent/tracker/udp/client.rs

View check run for this annotation

Codecov / codecov/patch

src/shared/bit_torrent/tracker/udp/client.rs#L250

Added line #L250 was not covered by tests
}
}
Loading

0 comments on commit effca56

Please sign in to comment.