Skip to content

Commit

Permalink
feat: new torrent repo implementation using parking_lot RwLock
Browse files Browse the repository at this point in the history
  • Loading branch information
josecelano committed Apr 16, 2024
1 parent 5750e2c commit 9258ac0
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 28 deletions.
22 changes: 21 additions & 1 deletion packages/torrent-repository/benches/repository_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod helpers;
use criterion::{criterion_group, criterion_main, Criterion};
use torrust_tracker_torrent_repository::{
TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio,
TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, TorrentsSkipMapRwLockParkingLot,
};

use crate::helpers::{asyn, sync};
Expand Down Expand Up @@ -49,6 +49,10 @@ fn add_one_torrent(c: &mut Criterion) {
b.iter_custom(sync::add_one_torrent::<TorrentsSkipMapMutexStd, _>);
});

group.bench_function("SkipMapRwLockParkingLot", |b| {
b.iter_custom(sync::add_one_torrent::<TorrentsSkipMapRwLockParkingLot, _>);
});

group.bench_function("DashMapMutexStd", |b| {
b.iter_custom(sync::add_one_torrent::<TorrentsDashMapMutexStd, _>);
});
Expand Down Expand Up @@ -102,6 +106,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.bench_function("SkipMapRwLockParkingLot", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsSkipMapRwLockParkingLot, _>(&rt, iters, None));
});

group.bench_function("DashMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsDashMapMutexStd, _>(&rt, iters, None));
Expand Down Expand Up @@ -156,6 +165,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.bench_function("SkipMapRwLockParkingLot", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsSkipMapRwLockParkingLot, _>(&rt, iters, None));
});

group.bench_function("DashMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsDashMapMutexStd, _>(&rt, iters, None));
Expand Down Expand Up @@ -211,6 +225,12 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.bench_function("SkipMapRwLockParkingLot", |b| {
b.to_async(&rt).iter_custom(|iters| {
sync::update_multiple_torrents_in_parallel::<TorrentsSkipMapRwLockParkingLot, _>(&rt, iters, None)
});
});

group.bench_function("DashMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsDashMapMutexStd, _>(&rt, iters, None));
Expand Down
1 change: 1 addition & 0 deletions packages/torrent-repository/src/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};

use self::peer_list::PeerList;

pub mod mutex_parking_lot;
pub mod mutex_std;
pub mod mutex_tokio;
pub mod peer_list;
Expand Down
49 changes: 49 additions & 0 deletions packages/torrent-repository/src/entry/mutex_parking_lot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::net::SocketAddr;
use std::sync::Arc;

use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};

use super::{Entry, EntrySync};
use crate::{EntryRwLockParkingLot, EntrySingle};

impl EntrySync for EntryRwLockParkingLot {
fn get_swarm_metadata(&self) -> SwarmMetadata {
self.read().get_swarm_metadata()
}

fn is_good(&self, policy: &TrackerPolicy) -> bool {
self.read().is_good(policy)
}

fn peers_is_empty(&self) -> bool {
self.read().peers_is_empty()
}

fn get_peers_len(&self) -> usize {
self.read().get_peers_len()
}

fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.read().get_peers(limit)
}

fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.read().get_peers_for_client(client, limit)
}

fn upsert_peer(&self, peer: &peer::Peer) -> bool {
self.write().upsert_peer(peer)
}

fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
self.write().remove_inactive_peers(current_cutoff);
}
}

impl From<EntrySingle> for EntryRwLockParkingLot {
fn from(entry: EntrySingle) -> Self {
Arc::new(parking_lot::RwLock::new(entry))
}
}
7 changes: 7 additions & 0 deletions packages/torrent-repository/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ use torrust_tracker_clock::clock;
pub mod entry;
pub mod repository;

// Repo Entries

pub type EntrySingle = entry::Torrent;
pub type EntryMutexStd = Arc<std::sync::Mutex<entry::Torrent>>;
pub type EntryMutexTokio = Arc<tokio::sync::Mutex<entry::Torrent>>;
pub type EntryRwLockParkingLot = Arc<parking_lot::RwLock<entry::Torrent>>;

// Repos

pub type TorrentsRwLockStd = RwLockStd<EntrySingle>;
pub type TorrentsRwLockStdMutexStd = RwLockStd<EntryMutexStd>;
Expand All @@ -21,6 +26,8 @@ pub type TorrentsRwLockTokioMutexStd = RwLockTokio<EntryMutexStd>;
pub type TorrentsRwLockTokioMutexTokio = RwLockTokio<EntryMutexTokio>;

pub type TorrentsSkipMapMutexStd = CrossbeamSkipList<EntryMutexStd>;
pub type TorrentsSkipMapRwLockParkingLot = CrossbeamSkipList<EntryRwLockParkingLot>;

pub type TorrentsDashMapMutexStd = XacrimonDashMap<EntryMutexStd>;

/// This code needs to be copied into each crate.
Expand Down
93 changes: 92 additions & 1 deletion packages/torrent-repository/src/repository/skip_map_mutex_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent
use super::Repository;
use crate::entry::peer_list::PeerList;
use crate::entry::{Entry, EntrySync};
use crate::{EntryMutexStd, EntrySingle};
use crate::{EntryMutexStd, EntryRwLockParkingLot, EntrySingle};

#[derive(Default, Debug)]
pub struct CrossbeamSkipList<T> {
Expand Down Expand Up @@ -108,3 +108,94 @@ where
}
}
}

impl Repository<EntryRwLockParkingLot> for CrossbeamSkipList<EntryRwLockParkingLot>
where
EntryRwLockParkingLot: EntrySync,
EntrySingle: Entry,
{
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) {
let entry = self.torrents.get_or_insert(*info_hash, Arc::default());
entry.value().upsert_peer(peer);
}

fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata> {
self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata())
}

fn get(&self, key: &InfoHash) -> Option<EntryRwLockParkingLot> {
let maybe_entry = self.torrents.get(key);
maybe_entry.map(|entry| entry.value().clone())
}

fn get_metrics(&self) -> TorrentsMetrics {
let mut metrics = TorrentsMetrics::default();

for entry in &self.torrents {
let stats = entry.value().read().get_swarm_metadata();
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
metrics.torrents += 1;
}

metrics
}

fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryRwLockParkingLot)> {
match pagination {
Some(pagination) => self
.torrents
.iter()
.skip(pagination.offset as usize)
.take(pagination.limit as usize)
.map(|entry| (*entry.key(), entry.value().clone()))
.collect(),
None => self
.torrents
.iter()
.map(|entry| (*entry.key(), entry.value().clone()))
.collect(),
}
}

fn import_persistent(&self, persistent_torrents: &PersistentTorrents) {
for (info_hash, completed) in persistent_torrents {
if self.torrents.contains_key(info_hash) {
continue;
}

let entry = EntryRwLockParkingLot::new(
EntrySingle {
swarm: PeerList::default(),
downloaded: *completed,
}
.into(),
);

// Since SkipMap is lock-free the torrent could have been inserted
// after checking if it exists.
self.torrents.get_or_insert(*info_hash, entry);
}
}

fn remove(&self, key: &InfoHash) -> Option<EntryRwLockParkingLot> {
self.torrents.remove(key).map(|entry| entry.value().clone())
}

fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
for entry in &self.torrents {
entry.value().remove_inactive_peers(current_cutoff);
}
}

fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
for entry in &self.torrents {
if entry.value().is_good(policy) {
continue;
}

entry.remove();
}
}
}
18 changes: 18 additions & 0 deletions packages/torrent-repository/tests/common/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use torrust_tracker_torrent_repository::repository::{Repository as _, Repository
use torrust_tracker_torrent_repository::{
EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio,
TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
TorrentsSkipMapRwLockParkingLot,
};

#[derive(Debug)]
Expand All @@ -19,6 +20,7 @@ pub(crate) enum Repo {
RwLockTokioMutexStd(TorrentsRwLockTokioMutexStd),
RwLockTokioMutexTokio(TorrentsRwLockTokioMutexTokio),
SkipMapMutexStd(TorrentsSkipMapMutexStd),
SkipMapRwLockParkingLot(TorrentsSkipMapRwLockParkingLot),
DashMapMutexStd(TorrentsDashMapMutexStd),
}

Expand All @@ -32,6 +34,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.upsert_peer(info_hash, peer).await,
Repo::RwLockTokioMutexTokio(repo) => repo.upsert_peer(info_hash, peer).await,
Repo::SkipMapMutexStd(repo) => repo.upsert_peer(info_hash, peer),
Repo::SkipMapRwLockParkingLot(repo) => repo.upsert_peer(info_hash, peer),
Repo::DashMapMutexStd(repo) => repo.upsert_peer(info_hash, peer),
}
}
Expand All @@ -45,6 +48,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.get_swarm_metadata(info_hash).await,
Repo::RwLockTokioMutexTokio(repo) => repo.get_swarm_metadata(info_hash).await,
Repo::SkipMapMutexStd(repo) => repo.get_swarm_metadata(info_hash),
Repo::SkipMapRwLockParkingLot(repo) => repo.get_swarm_metadata(info_hash),
Repo::DashMapMutexStd(repo) => repo.get_swarm_metadata(info_hash),
}
}
Expand All @@ -58,6 +62,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()),
Repo::RwLockTokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()),
Repo::SkipMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
Repo::SkipMapRwLockParkingLot(repo) => Some(repo.get(key)?.read().clone()),
Repo::DashMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
}
}
Expand All @@ -71,6 +76,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.get_metrics().await,
Repo::RwLockTokioMutexTokio(repo) => repo.get_metrics().await,
Repo::SkipMapMutexStd(repo) => repo.get_metrics(),
Repo::SkipMapRwLockParkingLot(repo) => repo.get_metrics(),
Repo::DashMapMutexStd(repo) => repo.get_metrics(),
}
}
Expand Down Expand Up @@ -111,6 +117,11 @@ impl Repo {
.iter()
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
.collect(),
Repo::SkipMapRwLockParkingLot(repo) => repo
.get_paginated(pagination)
.iter()
.map(|(i, t)| (*i, t.read().clone()))
.collect(),
Repo::DashMapMutexStd(repo) => repo
.get_paginated(pagination)
.iter()
Expand All @@ -128,6 +139,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await,
Repo::RwLockTokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await,
Repo::SkipMapMutexStd(repo) => repo.import_persistent(persistent_torrents),
Repo::SkipMapRwLockParkingLot(repo) => repo.import_persistent(persistent_torrents),
Repo::DashMapMutexStd(repo) => repo.import_persistent(persistent_torrents),
}
}
Expand All @@ -141,6 +153,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()),
Repo::RwLockTokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()),
Repo::SkipMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
Repo::SkipMapRwLockParkingLot(repo) => Some(repo.remove(key)?.write().clone()),
Repo::DashMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
}
}
Expand All @@ -154,6 +167,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::RwLockTokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::SkipMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
Repo::SkipMapRwLockParkingLot(repo) => repo.remove_inactive_peers(current_cutoff),
Repo::DashMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
}
}
Expand All @@ -167,6 +181,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await,
Repo::RwLockTokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await,
Repo::SkipMapMutexStd(repo) => repo.remove_peerless_torrents(policy),
Repo::SkipMapRwLockParkingLot(repo) => repo.remove_peerless_torrents(policy),
Repo::DashMapMutexStd(repo) => repo.remove_peerless_torrents(policy),
}
}
Expand Down Expand Up @@ -194,6 +209,9 @@ impl Repo {
Repo::SkipMapMutexStd(repo) => {
repo.torrents.insert(*info_hash, torrent.into());
}
Repo::SkipMapRwLockParkingLot(repo) => {
repo.torrents.insert(*info_hash, torrent.into());
}
Repo::DashMapMutexStd(repo) => {
repo.torrents.insert(*info_hash, torrent.into());
}
Expand Down
Loading

0 comments on commit 9258ac0

Please sign in to comment.