Skip to content

Commit

Permalink
Merge #797: Refactor: extract peer list in torrent repository entry
Browse files Browse the repository at this point in the history
40182b4 test: add tests for PeerList type (Jose Celano)
42f1b30 refactor: extract mod peer_list (Jose Celano)
922afda refactor: rename field from peers to swarm (Jose Celano)
4a567cd refactor: extract PeerList (Jose Celano)

Pull request description:

  This PR extracts a new type, "PeerList", used in the torrent repository entry.

  ### Why

  - It can be tested independently (unit tests and benchmarking).
  - Other implementations could be added in the future. This abstraction hides implementation details (the collection used).

  ### Performance

  It looks like it does not affect the performance.

  ```output
  Requests out: 406025.97/second
  Responses in: 365423.41/second
    - Connect responses:  180950.24
    - Announce responses: 180818.87
    - Scrape responses:   3654.30
    - Error responses:    0.00
  Peers per announce response: 0.00
  Announce responses per info hash:
    - p10: 1
    - p25: 1
    - p50: 1
    - p75: 1
    - p90: 2
    - p95: 3
    - p99: 104
    - p99.9: 295
    - p100: 367
  ```

ACKs for top commit:
  josecelano:
    ACK 40182b4

Tree-SHA512: 7051fadc2762654748f97ffaba0bdd99f3a8309e612c7f3ac53a5335b78be27dc0421ebe7d3425d563fd7c75e84690e08928f02a63dc8af2828a2eb454bce33b
  • Loading branch information
josecelano committed Apr 16, 2024
2 parents 52b7e3a + 40182b4 commit 7664ec6
Show file tree
Hide file tree
Showing 12 changed files with 362 additions and 56 deletions.
39 changes: 39 additions & 0 deletions packages/primitives/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,38 @@ pub mod fixture {
}

impl PeerBuilder {
#[allow(dead_code)]
#[must_use]
pub fn seeder() -> Self {
let peer = Peer {
peer_id: Id(*b"-qB00000000000000001"),
peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
updated: DurationSinceUnixEpoch::new(1_669_397_478_934, 0),
uploaded: NumberOfBytes(0),
downloaded: NumberOfBytes(0),
left: NumberOfBytes(0),
event: AnnounceEvent::Completed,
};

Self { peer }
}

#[allow(dead_code)]
#[must_use]
pub fn leecher() -> Self {
let peer = Peer {
peer_id: Id(*b"-qB00000000000000002"),
peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 8080),
updated: DurationSinceUnixEpoch::new(1_669_397_478_934, 0),
uploaded: NumberOfBytes(0),
downloaded: NumberOfBytes(0),
left: NumberOfBytes(10),
event: AnnounceEvent::Started,
};

Self { peer }
}

#[allow(dead_code)]
#[must_use]
pub fn with_peer_id(mut self, peer_id: &Id) -> Self {
Expand Down Expand Up @@ -390,6 +422,13 @@ pub mod fixture {
self
}

#[allow(dead_code)]
#[must_use]
pub fn last_updated_on(mut self, updated: DurationSinceUnixEpoch) -> Self {
self.peer.updated = updated;
self
}

#[allow(dead_code)]
#[must_use]
pub fn build(self) -> Peer {
Expand Down
9 changes: 5 additions & 4 deletions packages/torrent-repository/src/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;

//use serde::{Deserialize, Serialize};
use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};

use self::peer_list::PeerList;

pub mod mutex_std;
pub mod mutex_tokio;
pub mod peer_list;
pub mod single;

pub trait Entry {
Expand Down Expand Up @@ -81,9 +83,8 @@ pub trait EntryAsync {
/// The tracker keeps one entry like this for every torrent.
#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Torrent {
/// The swarm: a network of peers that are all trying to download the torrent associated to this entry
// #[serde(skip)]
pub(crate) peers: std::collections::BTreeMap<peer::Id, Arc<peer::Peer>>,
/// A network of peers that are all trying to download the torrent associated to this entry
pub(crate) swarm: PeerList,
/// The number of peers that have ever completed downloading the torrent associated to this entry
pub(crate) downloaded: u32,
}
289 changes: 289 additions & 0 deletions packages/torrent-repository/src/entry/peer_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
//! A peer list.
use std::net::SocketAddr;
use std::sync::Arc;

use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};

// code-review: the current implementation uses the peer Id as the ``BTreeMap``
// key. That would allow adding two identical peers except for the Id.
// For example, two peers with the same socket address but a different peer Id
// would be allowed. That would lead to duplicated peers in the tracker responses.

#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PeerList {
peers: std::collections::BTreeMap<peer::Id, Arc<peer::Peer>>,
}

impl PeerList {
#[must_use]
pub fn len(&self) -> usize {
self.peers.len()
}

#[must_use]
pub fn is_empty(&self) -> bool {
self.peers.is_empty()
}

pub fn upsert(&mut self, value: Arc<peer::Peer>) -> Option<Arc<peer::Peer>> {
self.peers.insert(value.peer_id, value)
}

pub fn remove(&mut self, key: &peer::Id) -> Option<Arc<peer::Peer>> {
self.peers.remove(key)
}

pub fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) {
self.peers
.retain(|_, peer| peer::ReadInfo::get_updated(peer) > current_cutoff);
}

#[must_use]
pub fn get(&self, peer_id: &peer::Id) -> Option<&Arc<peer::Peer>> {
self.peers.get(peer_id)
}

#[must_use]
pub fn get_all(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
match limit {
Some(limit) => self.peers.values().take(limit).cloned().collect(),
None => self.peers.values().cloned().collect(),
}
}

#[must_use]
pub fn seeders_and_leechers(&self) -> (usize, usize) {
let seeders = self.peers.values().filter(|peer| peer.is_seeder()).count();
let leechers = self.len() - seeders;

(seeders, leechers)
}

#[must_use]
pub fn get_peers_excluding_addr(&self, peer_addr: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
match limit {
Some(limit) => self
.peers
.values()
// Take peers which are not the client peer
.filter(|peer| peer::ReadInfo::get_address(peer.as_ref()) != *peer_addr)
// Limit the number of peers on the result
.take(limit)
.cloned()
.collect(),
None => self
.peers
.values()
// Take peers which are not the client peer
.filter(|peer| peer::ReadInfo::get_address(peer.as_ref()) != *peer_addr)
.cloned()
.collect(),
}
}
}

#[cfg(test)]
mod tests {

mod it_should {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;

use torrust_tracker_primitives::peer::fixture::PeerBuilder;
use torrust_tracker_primitives::peer::{self};
use torrust_tracker_primitives::DurationSinceUnixEpoch;

use crate::entry::peer_list::PeerList;

#[test]
fn be_empty_when_no_peers_have_been_inserted() {
let peer_list = PeerList::default();

assert!(peer_list.is_empty());
}

#[test]
fn have_zero_length_when_no_peers_have_been_inserted() {
let peer_list = PeerList::default();

assert_eq!(peer_list.len(), 0);
}

#[test]
fn allow_inserting_a_new_peer() {
let mut peer_list = PeerList::default();

let peer = PeerBuilder::default().build();

assert_eq!(peer_list.upsert(peer.into()), None);
}

#[test]
fn allow_updating_a_preexisting_peer() {
let mut peer_list = PeerList::default();

let peer = PeerBuilder::default().build();

peer_list.upsert(peer.into());

assert_eq!(peer_list.upsert(peer.into()), Some(Arc::new(peer)));
}

#[test]
fn allow_getting_all_peers() {
let mut peer_list = PeerList::default();

let peer = PeerBuilder::default().build();

peer_list.upsert(peer.into());

assert_eq!(peer_list.get_all(None), [Arc::new(peer)]);
}

#[test]
fn allow_getting_one_peer_by_id() {
let mut peer_list = PeerList::default();

let peer = PeerBuilder::default().build();

peer_list.upsert(peer.into());

assert_eq!(peer_list.get(&peer.peer_id), Some(Arc::new(peer)).as_ref());
}

#[test]
fn increase_the_number_of_peers_after_inserting_a_new_one() {
let mut peer_list = PeerList::default();

let peer = PeerBuilder::default().build();

peer_list.upsert(peer.into());

assert_eq!(peer_list.len(), 1);
}

#[test]
fn decrease_the_number_of_peers_after_removing_one() {
let mut peer_list = PeerList::default();

let peer = PeerBuilder::default().build();

peer_list.upsert(peer.into());

peer_list.remove(&peer.peer_id);

assert!(peer_list.is_empty());
}

#[test]
fn allow_removing_an_existing_peer() {
let mut peer_list = PeerList::default();

let peer = PeerBuilder::default().build();

peer_list.upsert(peer.into());

peer_list.remove(&peer.peer_id);

assert_eq!(peer_list.get(&peer.peer_id), None);
}

#[test]
fn allow_getting_all_peers_excluding_peers_with_a_given_address() {
let mut peer_list = PeerList::default();

let peer1 = PeerBuilder::default()
.with_peer_id(&peer::Id(*b"-qB00000000000000001"))
.with_peer_addr(&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 6969))
.build();
peer_list.upsert(peer1.into());

let peer2 = PeerBuilder::default()
.with_peer_id(&peer::Id(*b"-qB00000000000000002"))
.with_peer_addr(&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 6969))
.build();
peer_list.upsert(peer2.into());

assert_eq!(peer_list.get_peers_excluding_addr(&peer2.peer_addr, None), [Arc::new(peer1)]);
}

#[test]
fn return_the_number_of_seeders_in_the_list() {
let mut peer_list = PeerList::default();

let seeder = PeerBuilder::seeder().build();
let leecher = PeerBuilder::leecher().build();

peer_list.upsert(seeder.into());
peer_list.upsert(leecher.into());

let (seeders, _leechers) = peer_list.seeders_and_leechers();

assert_eq!(seeders, 1);
}

#[test]
fn return_the_number_of_leechers_in_the_list() {
let mut peer_list = PeerList::default();

let seeder = PeerBuilder::seeder().build();
let leecher = PeerBuilder::leecher().build();

peer_list.upsert(seeder.into());
peer_list.upsert(leecher.into());

let (_seeders, leechers) = peer_list.seeders_and_leechers();

assert_eq!(leechers, 1);
}

#[test]
fn remove_inactive_peers() {
let mut peer_list = PeerList::default();
let one_second = DurationSinceUnixEpoch::new(1, 0);

// Insert the peer
let last_update_time = DurationSinceUnixEpoch::new(1_669_397_478_934, 0);
let peer = PeerBuilder::default().last_updated_on(last_update_time).build();
peer_list.upsert(peer.into());

// Remove peers not updated since one second after inserting the peer
peer_list.remove_inactive_peers(last_update_time + one_second);

assert_eq!(peer_list.len(), 0);
}

#[test]
fn not_remove_active_peers() {
let mut peer_list = PeerList::default();
let one_second = DurationSinceUnixEpoch::new(1, 0);

// Insert the peer
let last_update_time = DurationSinceUnixEpoch::new(1_669_397_478_934, 0);
let peer = PeerBuilder::default().last_updated_on(last_update_time).build();
peer_list.upsert(peer.into());

// Remove peers not updated since one second before inserting the peer.
peer_list.remove_inactive_peers(last_update_time - one_second);

assert_eq!(peer_list.len(), 1);
}

#[test]
fn allow_inserting_two_identical_peers_except_for_the_id() {
let mut peer_list = PeerList::default();

let peer1 = PeerBuilder::default()
.with_peer_id(&peer::Id(*b"-qB00000000000000001"))
.build();
peer_list.upsert(peer1.into());

let peer2 = PeerBuilder::default()
.with_peer_id(&peer::Id(*b"-qB00000000000000002"))
.build();
peer_list.upsert(peer2.into());

assert_eq!(peer_list.len(), 2);
}
}
}
Loading

0 comments on commit 7664ec6

Please sign in to comment.