Skip to content

Commit

Permalink
dev: torrent repository cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Mar 25, 2024
1 parent 3e0745b commit e18cae4
Show file tree
Hide file tree
Showing 26 changed files with 259 additions and 462 deletions.
1 change: 1 addition & 0 deletions cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
"ostr",
"Pando",
"peekable",
"peerlist",
"proot",
"proto",
"Quickstart",
Expand Down
2 changes: 1 addition & 1 deletion packages/configuration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ use torrust_tracker_primitives::{DatabaseDriver, TrackerMode};
/// The maximum number of returned peers for a torrent.
pub const TORRENT_PEERS_LIMIT: usize = 74;

#[derive(Copy, Clone, Debug, PartialEq, Default, Constructor)]
#[derive(Copy, Clone, Debug, PartialEq, Constructor)]
pub struct TrackerPolicy {
pub remove_peerless_torrents: bool,
pub max_peer_timeout: u32,
Expand Down
2 changes: 1 addition & 1 deletion packages/primitives/src/announce_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};

/// Announce events. Described on the
/// [BEP 3. The `BitTorrent` Protocol Specification](https://www.bittorrent.org/beps/bep_0003.html)
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, Serialize, Deserialize)]
#[derive(Hash, Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum AnnounceEvent {
/// The peer has started downloading the torrent.
Started,
Expand Down
19 changes: 19 additions & 0 deletions packages/primitives/src/info_hash.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::hash::{DefaultHasher, Hash, Hasher};
use std::panic::Location;

use thiserror::Error;
Expand Down Expand Up @@ -77,6 +78,24 @@ impl std::convert::From<&[u8]> for InfoHash {
}
}

/// for testing
impl std::convert::From<&DefaultHasher> for InfoHash {
fn from(data: &DefaultHasher) -> InfoHash {
let n = data.finish().to_le_bytes();
InfoHash([
n[0], n[1], n[2], n[3], n[4], n[5], n[6], n[7], n[0], n[1], n[2], n[3], n[4], n[5], n[6], n[7], n[0], n[1], n[2],
n[3],
])
}
}

impl std::convert::From<&i32> for InfoHash {
fn from(n: &i32) -> InfoHash {
let n = n.to_le_bytes();
InfoHash([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, n[0], n[1], n[2], n[3]])
}
}

impl std::convert::From<[u8; 20]> for InfoHash {
fn from(val: [u8; 20]) -> Self {
InfoHash(val)
Expand Down
5 changes: 3 additions & 2 deletions packages/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! which is a `BitTorrent` tracker server. These structures are used not only
//! by the tracker server crate, but also by other crates in the Torrust
//! ecosystem.
use std::collections::BTreeMap;
use std::time::Duration;

use info_hash::InfoHash;
Expand Down Expand Up @@ -38,7 +39,7 @@ pub enum IPVersion {
}

/// Number of bytes downloaded, uploaded or pending to download (left) by the peer.
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, Serialize, Deserialize)]
#[derive(Hash, Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct NumberOfBytes(pub i64);

/// The database management system used by the tracker.
Expand All @@ -58,7 +59,7 @@ pub enum DatabaseDriver {
MySQL,
}

pub type PersistentTorrents = Vec<(InfoHash, u32)>;
pub type PersistentTorrents = BTreeMap<InfoHash, u32>;

/// The mode the tracker will run in.
///
Expand Down
8 changes: 2 additions & 6 deletions packages/primitives/src/pagination.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use derive_more::Constructor;
use serde::Deserialize;

/// A struct to keep information about the page when results are being paginated
#[derive(Deserialize, Copy, Clone, Debug, PartialEq)]
#[derive(Deserialize, Copy, Clone, Debug, PartialEq, Constructor)]
pub struct Pagination {
/// The page number, starting at 0
pub offset: u32,
Expand All @@ -10,11 +11,6 @@ pub struct Pagination {
}

impl Pagination {
#[must_use]
pub fn new(offset: u32, limit: u32) -> Self {
Self { offset, limit }
}

#[must_use]
pub fn new_with_options(offset_option: Option<u32>, limit_option: Option<u32>) -> Self {
let offset = match offset_option {
Expand Down
24 changes: 20 additions & 4 deletions packages/primitives/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{ser_unix_time_value, DurationSinceUnixEpoch, IPVersion, NumberOfByte
/// event: AnnounceEvent::Started,
/// };
/// ```
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Copy)]
#[derive(Debug, Clone, Serialize, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Peer {
/// ID used by the downloader peer
pub peer_id: Id,
Expand Down Expand Up @@ -173,6 +173,16 @@ impl From<[u8; 20]> for Id {
}
}

impl From<i32> for Id {
fn from(number: i32) -> Self {
let peer_id = number.to_le_bytes();
Id::from([
0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, peer_id[0], peer_id[1], peer_id[2],
peer_id[3],
])
}
}

impl TryFrom<Vec<u8>> for Id {
type Error = IdConversionError;

Expand Down Expand Up @@ -332,7 +342,7 @@ impl<P: Encoding> FromIterator<Peer> for Vec<P> {
}

pub mod fixture {
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use super::{Id, Peer};
use crate::announce_event::AnnounceEvent;
Expand Down Expand Up @@ -396,8 +406,8 @@ pub mod fixture {
impl Default for Peer {
fn default() -> Self {
Self {
peer_id: Id(*b"-qB00000000000000000"),
peer_addr: std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(126, 0, 0, 1)), 8080),
peer_id: Id::default(),
peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
updated: DurationSinceUnixEpoch::new(1_669_397_478_934, 0),
uploaded: NumberOfBytes(0),
downloaded: NumberOfBytes(0),
Expand All @@ -406,6 +416,12 @@ pub mod fixture {
}
}
}

impl Default for Id {
fn default() -> Self {
Self(*b"-qB00000000000000000")
}
}
}

#[cfg(test)]
Expand Down
12 changes: 6 additions & 6 deletions packages/primitives/src/torrent_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ use std::ops::AddAssign;
#[derive(Copy, Clone, Debug, PartialEq, Default)]
pub struct TorrentsMetrics {
/// Total number of seeders for all torrents
pub seeders: u64,
pub complete: u64,
/// Total number of peers that have ever completed downloading for all torrents.
pub completed: u64,
pub downloaded: u64,
/// Total number of leechers for all torrents.
pub leechers: u64,
pub incomplete: u64,
/// Total number of torrents.
pub torrents: u64,
}

impl AddAssign for TorrentsMetrics {
fn add_assign(&mut self, rhs: Self) {
self.seeders += rhs.seeders;
self.completed += rhs.completed;
self.leechers += rhs.leechers;
self.complete += rhs.complete;
self.downloaded += rhs.downloaded;
self.incomplete += rhs.incomplete;
self.torrents += rhs.torrents;
}
}
31 changes: 15 additions & 16 deletions packages/torrent-repository/src/entry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;

//use serde::{Deserialize, Serialize};
Expand All @@ -17,7 +18,7 @@ pub trait Entry {
fn get_stats(&self) -> SwarmMetadata;

/// Returns True if Still a Valid Entry according to the Tracker Policy
fn is_not_zombie(&self, policy: &TrackerPolicy) -> bool;
fn is_good(&self, policy: &TrackerPolicy) -> bool;

/// Returns True if the Peers is Empty
fn peers_is_empty(&self) -> bool;
Expand All @@ -33,7 +34,7 @@ pub trait Entry {
///
/// It filters out the input peer, typically because we want to return this
/// list of peers to that client peer.
fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;

/// It updates a peer and returns true if the number of complete downloads have increased.
///
Expand All @@ -51,28 +52,26 @@ pub trait Entry {
#[allow(clippy::module_name_repetitions)]
pub trait EntrySync {
fn get_stats(&self) -> SwarmMetadata;
fn is_not_zombie(&self, policy: &TrackerPolicy) -> bool;
fn is_good(&self, policy: &TrackerPolicy) -> bool;
fn peers_is_empty(&self) -> bool;
fn get_peers_len(&self) -> usize;
fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool;
fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata);
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch);
}

#[allow(clippy::module_name_repetitions)]
pub trait EntryAsync {
fn get_stats(self) -> impl std::future::Future<Output = SwarmMetadata> + Send;

#[allow(clippy::wrong_self_convention)]
fn is_not_zombie(self, policy: &TrackerPolicy) -> impl std::future::Future<Output = bool> + Send;
fn peers_is_empty(self) -> impl std::future::Future<Output = bool> + Send;
fn get_peers_len(self) -> impl std::future::Future<Output = usize> + Send;
fn get_peers(self, limit: Option<usize>) -> impl std::future::Future<Output = Vec<Arc<peer::Peer>>> + Send;
fn get_peers_for_peer(
self,
client: &peer::Peer,
fn get_stats(&self) -> impl std::future::Future<Output = SwarmMetadata> + Send;
fn check_good(self, policy: &TrackerPolicy) -> impl std::future::Future<Output = bool> + Send;
fn peers_is_empty(&self) -> impl std::future::Future<Output = bool> + Send;
fn get_peers_len(&self) -> impl std::future::Future<Output = usize> + Send;
fn get_peers(&self, limit: Option<usize>) -> impl std::future::Future<Output = Vec<Arc<peer::Peer>>> + Send;
fn get_peers_for_client(
&self,
client: &SocketAddr,
limit: Option<usize>,
) -> impl std::future::Future<Output = Vec<Arc<peer::Peer>>> + Send;
fn insert_or_update_peer(self, peer: &peer::Peer) -> impl std::future::Future<Output = bool> + Send;
Expand All @@ -88,11 +87,11 @@ pub trait EntryAsync {
/// This is the tracker entry for a given torrent and contains the swarm data,
/// that's the list of all the peers trying to download the same torrent.
/// The tracker keeps one entry like this for every torrent.
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Torrent {
/// The swarm: a network of peers that are all trying to download the torrent associated to this entry
// #[serde(skip)]
pub(crate) peers: std::collections::BTreeMap<peer::Id, Arc<peer::Peer>>,
/// The number of peers that have ever completed downloading the torrent associated to this entry
pub(crate) completed: u32,
pub(crate) downloaded: u32,
}
17 changes: 12 additions & 5 deletions packages/torrent-repository/src/entry/mutex_std.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use std::net::SocketAddr;
use std::sync::Arc;

use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};

use super::{Entry, EntrySync};
use crate::EntryMutexStd;
use crate::{EntryMutexStd, EntrySingle};

impl EntrySync for EntryMutexStd {
fn get_stats(&self) -> SwarmMetadata {
self.lock().expect("it should get a lock").get_stats()
}

fn is_not_zombie(&self, policy: &TrackerPolicy) -> bool {
self.lock().expect("it should get a lock").is_not_zombie(policy)
fn is_good(&self, policy: &TrackerPolicy) -> bool {
self.lock().expect("it should get a lock").is_good(policy)
}

fn peers_is_empty(&self) -> bool {
Expand All @@ -28,8 +29,8 @@ impl EntrySync for EntryMutexStd {
self.lock().expect("it should get lock").get_peers(limit)
}

fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().expect("it should get lock").get_peers_for_peer(client, limit)
fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().expect("it should get lock").get_peers_for_client(client, limit)
}

fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool {
Expand All @@ -48,3 +49,9 @@ impl EntrySync for EntryMutexStd {
.remove_inactive_peers(current_cutoff);
}
}

impl From<EntrySingle> for EntryMutexStd {
fn from(entry: EntrySingle) -> Self {
Arc::new(std::sync::Mutex::new(entry))
}
}
25 changes: 16 additions & 9 deletions packages/torrent-repository/src/entry/mutex_tokio.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
use std::net::SocketAddr;
use std::sync::Arc;

use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};

use super::{Entry, EntryAsync};
use crate::EntryMutexTokio;
use crate::{EntryMutexTokio, EntrySingle};

impl EntryAsync for EntryMutexTokio {
async fn get_stats(self) -> SwarmMetadata {
async fn get_stats(&self) -> SwarmMetadata {
self.lock().await.get_stats()
}

async fn is_not_zombie(self, policy: &TrackerPolicy) -> bool {
self.lock().await.is_not_zombie(policy)
async fn check_good(self, policy: &TrackerPolicy) -> bool {
self.lock().await.is_good(policy)
}

async fn peers_is_empty(self) -> bool {
async fn peers_is_empty(&self) -> bool {
self.lock().await.peers_is_empty()
}

async fn get_peers_len(self) -> usize {
async fn get_peers_len(&self) -> usize {
self.lock().await.get_peers_len()
}

async fn get_peers(self, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
async fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().await.get_peers(limit)
}

async fn get_peers_for_peer(self, client: &peer::Peer, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().await.get_peers_for_peer(client, limit)
async fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().await.get_peers_for_client(client, limit)
}

async fn insert_or_update_peer(self, peer: &peer::Peer) -> bool {
Expand All @@ -44,3 +45,9 @@ impl EntryAsync for EntryMutexTokio {
self.lock().await.remove_inactive_peers(current_cutoff);
}
}

impl From<EntrySingle> for EntryMutexTokio {
fn from(entry: EntrySingle) -> Self {
Arc::new(tokio::sync::Mutex::new(entry))
}
}
Loading

0 comments on commit e18cae4

Please sign in to comment.