Skip to content

Commit

Permalink
refactor: [torrust#639] Tracker Checker: extract checker:Client to ch…
Browse files Browse the repository at this point in the history
…eck UDP servers

It will be used in teh Tracker Checker too.
  • Loading branch information
josecelano committed Jan 31, 2024
1 parent 8d07a34 commit 82ec491
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 115 deletions.
145 changes: 30 additions & 115 deletions src/console/clients/udp/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,22 @@
//! ```
//!
//! The protocol (`udp://`) in the URL is mandatory. The path (`\scrape`) is optional. It always uses `\scrape`.
use std::net::{Ipv4Addr, SocketAddr, ToSocketAddrs};
use std::net::{SocketAddr, ToSocketAddrs};
use std::str::FromStr;

use anyhow::Context;
use aquatic_udp_protocol::common::InfoHash;
use aquatic_udp_protocol::Response::{AnnounceIpv4, AnnounceIpv6, Scrape};
use aquatic_udp_protocol::{
AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, NumberOfBytes, NumberOfPeers, PeerId, PeerKey, Port, Response,
ScrapeRequest, TransactionId,
};
use aquatic_udp_protocol::{Port, TransactionId};
use clap::{Parser, Subcommand};
use log::{debug, LevelFilter};
use serde_json::json;
use url::Url;

use crate::console::clients::udp::checker;
use crate::shared::bit_torrent::info_hash::InfoHash as TorrustInfoHash;
use crate::shared::bit_torrent::tracker::udp::client::{UdpClient, UdpTrackerClient};
use crate::shared::bit_torrent::tracker::udp::client::UdpClient;

const ASSIGNED_BY_OS: i32 = 0;
const ASSIGNED_BY_OS: u16 = 0;
const RANDOM_TRANSACTION_ID: i32 = -888_840_697;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -119,32 +116,42 @@ pub async fn run() -> anyhow::Result<()> {
debug!("Binding to: {local_bind_to}");
let udp_client = UdpClient::bind(&local_bind_to).await;
let bound_to = udp_client.socket.local_addr().context("binding local address")?;
debug!("Bound to: {bound_to}");

let transaction_id = TransactionId(transaction_id);
debug!("Bound to: {bound_to}");

let response = match args.command {
Command::Announce {
tracker_socket_addr,
info_hash,
} => {
let (connection_id, udp_tracker_client) = connect(&tracker_socket_addr, udp_client, transaction_id).await;

send_announce_request(
connection_id,
transaction_id,
info_hash,
Port(bound_to.port()),
&udp_tracker_client,
)
.await
let transaction_id = TransactionId(transaction_id);

let mut new_udp_client = checker::Client::default();
let _bound_to = new_udp_client.bind(local_port).await?;

new_udp_client.connect(&tracker_socket_addr, udp_client).await;

let connection_id = new_udp_client.send_connection_request(transaction_id).await?;

new_udp_client
.send_announce_request(connection_id, transaction_id, info_hash, Port(bound_to.port()))
.await?
}
Command::Scrape {
tracker_socket_addr,
info_hashes,
} => {
let (connection_id, udp_tracker_client) = connect(&tracker_socket_addr, udp_client, transaction_id).await;
send_scrape_request(connection_id, transaction_id, info_hashes, &udp_tracker_client).await
let transaction_id = TransactionId(transaction_id);

let mut new_udp_client = checker::Client::default();
let _bound_to = new_udp_client.bind(local_port).await?;

new_udp_client.connect(&tracker_socket_addr, udp_client).await;

let connection_id = new_udp_client.send_connection_request(transaction_id).await?;

new_udp_client
.send_scrape_request(connection_id, transaction_id, info_hashes)
.await?
}
};

Expand Down Expand Up @@ -265,95 +272,3 @@ fn parse_info_hash(info_hash_str: &str) -> anyhow::Result<TorrustInfoHash> {
TorrustInfoHash::from_str(info_hash_str)
.map_err(|e| anyhow::Error::msg(format!("failed to parse info-hash `{info_hash_str}`: {e:?}")))
}

async fn connect(
tracker_socket_addr: &SocketAddr,
udp_client: UdpClient,
transaction_id: TransactionId,
) -> (ConnectionId, UdpTrackerClient) {
debug!("Connecting to tracker: udp://{tracker_socket_addr}");

udp_client.connect(&tracker_socket_addr.to_string()).await;

let udp_tracker_client = UdpTrackerClient { udp_client };

let connection_id = send_connection_request(transaction_id, &udp_tracker_client).await;

(connection_id, udp_tracker_client)
}

async fn send_connection_request(transaction_id: TransactionId, client: &UdpTrackerClient) -> ConnectionId {
debug!("Sending connection request with transaction id: {transaction_id:#?}");

let connect_request = ConnectRequest { transaction_id };

client.send(connect_request.into()).await;

let response = client.receive().await;

debug!("connection request response:\n{response:#?}");

match response {
Response::Connect(connect_response) => connect_response.connection_id,
_ => panic!("error connecting to udp server. Unexpected response"),
}
}

async fn send_announce_request(
connection_id: ConnectionId,
transaction_id: TransactionId,
info_hash: TorrustInfoHash,
port: Port,
client: &UdpTrackerClient,
) -> Response {
debug!("Sending announce request with transaction id: {transaction_id:#?}");

let announce_request = AnnounceRequest {
connection_id,
transaction_id,
info_hash: InfoHash(info_hash.bytes()),
peer_id: PeerId(*b"-qB00000000000000001"),
bytes_downloaded: NumberOfBytes(0i64),
bytes_uploaded: NumberOfBytes(0i64),
bytes_left: NumberOfBytes(0i64),
event: AnnounceEvent::Started,
ip_address: Some(Ipv4Addr::new(0, 0, 0, 0)),
key: PeerKey(0u32),
peers_wanted: NumberOfPeers(1i32),
port,
};

client.send(announce_request.into()).await;

let response = client.receive().await;

debug!("announce request response:\n{response:#?}");

response
}

async fn send_scrape_request(
connection_id: ConnectionId,
transaction_id: TransactionId,
info_hashes: Vec<TorrustInfoHash>,
client: &UdpTrackerClient,
) -> Response {
debug!("Sending scrape request with transaction id: {transaction_id:#?}");

let scrape_request = ScrapeRequest {
connection_id,
transaction_id,
info_hashes: info_hashes
.iter()
.map(|torrust_info_hash| InfoHash(torrust_info_hash.bytes()))
.collect(),
};

client.send(scrape_request.into()).await;

let response = client.receive().await;

debug!("scrape request response:\n{response:#?}");

response
}
197 changes: 197 additions & 0 deletions src/console/clients/udp/checker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
use std::net::{Ipv4Addr, SocketAddr};

use anyhow::Context;
use aquatic_udp_protocol::common::InfoHash;
use aquatic_udp_protocol::{
AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, NumberOfBytes, NumberOfPeers, PeerId, PeerKey, Port, Response,
ScrapeRequest, TransactionId,
};
use log::debug;
use thiserror::Error;

use crate::shared::bit_torrent::info_hash::InfoHash as TorrustInfoHash;
use crate::shared::bit_torrent::tracker::udp::client::{UdpClient, UdpTrackerClient};

#[derive(Error, Debug)]
pub enum ClientError {
#[error("Not connected to remote tracker UDP socket. Try connecting before making requests.")]
NotConnected,
#[error("Unexpected response while connecting the the remote server.")]
UnexpectedConnectionResponse,
}

/// A UDP Tracker client to make test requests (checks).
#[derive(Debug, Default)]
pub struct Client {
/// Local UDP socket. It could be 0 to assign a free port.
local_binding_address: Option<SocketAddr>,

/// Local UDP socket after binding. It's equals to binding address if a
/// non- zero port was used.
local_bound_address: Option<SocketAddr>,

/// Remote UDP tracker socket
remote_socket: Option<SocketAddr>,

/// The client used to make UDP requests to the tracker.
udp_tracker_client: Option<UdpTrackerClient>,

/// The client used to make UDP requests to a generic UDP server.
udp_client: Option<UdpClient>,
}

impl Client {
/// Binds local client socket.
///
/// # Errors
///
/// Will return an error if it can't get the bound local address.
pub async fn bind(&mut self, local_port: u16) -> anyhow::Result<SocketAddr> {
let local_bind_to = format!("0.0.0.0:{local_port}");
let binding_address = local_bind_to.parse().context("binding local address")?;

debug!("Binding to: {local_bind_to}");
let udp_client = UdpClient::bind(&local_bind_to).await;

let bound_to = udp_client.socket.local_addr().context("bound local address")?;
debug!("Bound to: {bound_to}");

self.local_binding_address = Some(binding_address);
self.local_bound_address = Some(bound_to);
self.udp_client = Some(udp_client);

Ok(bound_to)
}

/// Connects to the remote server socket.
///
/// # Errors
///
/// Will return and error if it can't make a connection request successfully
/// to the remote UDP server.
pub async fn connect(&mut self, tracker_socket_addr: &SocketAddr, udp_client: UdpClient) {
debug!("Connecting to tracker: udp://{tracker_socket_addr}");

udp_client.connect(&tracker_socket_addr.to_string()).await;

self.remote_socket = Some(*tracker_socket_addr);

self.udp_tracker_client = Some(UdpTrackerClient { udp_client });
}

/// Sends a connection request to the UDP Tracker server.
///
/// # Errors
///
/// Will return and error if
///
/// - It can't connect to the remote UDP socket.
/// - It can't make a connection request successfully to the remote UDP
/// server (after successfully connecting to the remote UDP socket).
///
/// # Panics
///
/// Will panic if it receives an unexpected response.
pub async fn send_connection_request(&self, transaction_id: TransactionId) -> anyhow::Result<ConnectionId> {
debug!("Sending connection request with transaction id: {transaction_id:#?}");

let connect_request = ConnectRequest { transaction_id };

match &self.udp_tracker_client {
Some(client) => {
client.send(connect_request.into()).await;

let response = client.receive().await;

debug!("connection request response:\n{response:#?}");

match response {
Response::Connect(connect_response) => Ok(connect_response.connection_id),
_ => Err(ClientError::UnexpectedConnectionResponse.into()),
}
}
None => Err(ClientError::NotConnected.into()),
}
}

/// Sends an announce request to the UDP Tracker server.
///
/// # Errors
///
/// Will return and error if the client is not connected. You have to connect
/// before calling this function.
pub async fn send_announce_request(
&self,
connection_id: ConnectionId,
transaction_id: TransactionId,
info_hash: TorrustInfoHash,
client_port: Port,
) -> anyhow::Result<Response> {
debug!("Sending announce request with transaction id: {transaction_id:#?}");

let announce_request = AnnounceRequest {
connection_id,
transaction_id,
info_hash: InfoHash(info_hash.bytes()),
peer_id: PeerId(*b"-qB00000000000000001"),
bytes_downloaded: NumberOfBytes(0i64),
bytes_uploaded: NumberOfBytes(0i64),
bytes_left: NumberOfBytes(0i64),
event: AnnounceEvent::Started,
ip_address: Some(Ipv4Addr::new(0, 0, 0, 0)),
key: PeerKey(0u32),
peers_wanted: NumberOfPeers(1i32),
port: client_port,
};

match &self.udp_tracker_client {
Some(client) => {
client.send(announce_request.into()).await;

let response = client.receive().await;

debug!("announce request response:\n{response:#?}");

Ok(response)
}
None => Err(ClientError::NotConnected.into()),
}
}

/// Sends a scrape request to the UDP Tracker server.
///
/// # Errors
///
/// Will return and error if the client is not connected. You have to connect
/// before calling this function.
pub async fn send_scrape_request(
&self,
connection_id: ConnectionId,
transaction_id: TransactionId,
info_hashes: Vec<TorrustInfoHash>,
) -> anyhow::Result<Response> {
debug!("Sending scrape request with transaction id: {transaction_id:#?}");

let scrape_request = ScrapeRequest {
connection_id,
transaction_id,
info_hashes: info_hashes
.iter()
.map(|torrust_info_hash| InfoHash(torrust_info_hash.bytes()))
.collect(),
};

match &self.udp_tracker_client {
Some(client) => {
client.send(scrape_request.into()).await;

let response = client.receive().await;

debug!("scrape request response:\n{response:#?}");

Ok(response)
}
None => Err(ClientError::NotConnected.into()),
}
}
}
1 change: 1 addition & 0 deletions src/console/clients/udp/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod app;
pub mod checker;
Loading

0 comments on commit 82ec491

Please sign in to comment.