Skip to content

Commit

Permalink
Merge #778: Performance optimization: create a new torrent repository…
Browse files Browse the repository at this point in the history
… using a `SkipMap` instead of a `BTreeMap`

12f54e7 test: add tests for new torrent repository using SkipMap (Jose Celano)
0989285 refactor: separate torrent repository trait from implementations (Jose Celano)
eec2024 chore: ignore crossbeam-skiplist crate in cargo-machete (Jose Celano)
642d6be feat: new torrent repository using crossbeam_skiplist::SkipMap (Jose Celano)
608585e chore: add new cargo dependency: crossbeam-skiplist (Jose Celano)

Pull request description:

  This PR implements a new Torrent Repository replacing the outer BTreeMap for torrents with a [SkipMap](https://docs.rs/crossbeam-skiplist/latest/crossbeam_skiplist/).

  ### Why

  - It is straightforward to implement because the API is very similar. In fact, SkipMap is a replacement for BTreeMap that allows concurrency in adding new torrents.
  - One problem with BTreeMap is that you have to lock the entire structure to add a new torrent. SkipMap is a lock-free structure.
  - It is a lock-free structure that uses Atomics internally, which is more lightweight than locks.
  - Unlike the DashMap implementation, it does not use unsafe code. However:

  > NOTICE: Race conditions could be introduced if the implementation was incorrect. See https://docs.rs/crossbeam-skiplist/latest/crossbeam_skiplist/#concurrent-access

  ### Benchmarking

  Running the Aquatic UDP load test gives the same results:

  Current best implementation:

  ```output

  Requests out: 397287.37/second
  Responses in: 357549.15/second
    - Connect responses:  177073.94
    - Announce responses: 176905.36
    - Scrape responses:   3569.85
    - Error responses:    0.00
  Peers per announce response: 0.00
  Announce responses per info hash:
    - p10: 1
    - p25: 1
    - p50: 1
    - p75: 1
    - p90: 2
    - p95: 3
    - p99: 104
    - p99.9: 287
    - p100: 371
  ```

  SkipMap:

  ```output
  Requests out: 396788.68/second
  Responses in: 357105.27/second
    - Connect responses:  176662.91
    - Announce responses: 176863.44
    - Scrape responses:   3578.91
    - Error responses:    0.00
  Peers per announce response: 0.00
  Announce responses per info hash:
    - p10: 1
    - p25: 1
    - p50: 1
    - p75: 1
    - p90: 2
    - p95: 3
    - p99: 105
    - p99.9: 287
    - p100: 351
  ```

  However, benchmarking the repositories using Criterion shows better results in adding and updating multiple torrents in parallel.

  ![image](https://github.com/torrust/torrust-tracker/assets/58816/4f0736f1-3e70-4a55-8084-3265e9cd087d)

  ![image](https://github.com/torrust/torrust-tracker/assets/58816/a9c7a034-dbd3-40a0-85d5-c884f1286964)

  ![image](https://github.com/torrust/torrust-tracker/assets/58816/a3a8f37c-4230-4cfb-9adb-2c4d72ab9fd7)

  ![image](https://github.com/torrust/torrust-tracker/assets/58816/43a2d9cb-ccbc-496c-9d57-a98887f1575c)

  ### Conclusion

  I think we car merge it but I would also continue with the DashMap implementation to compare.

ACKs for top commit:
  josecelano:
    ACK 12f54e7

Tree-SHA512: 0f300360abe5f70cef21bb5c583ecadedd5a1b233125951d90ffe510e551a9ee7b41ba6dacd23176656fb18095645fdde24a23c23d901e964300e3e1257b3b71
  • Loading branch information
josecelano committed Apr 8, 2024
2 parents 2277099 + 12f54e7 commit 6eff113
Show file tree
Hide file tree
Showing 13 changed files with 388 additions and 131 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ chrono = { version = "0", default-features = false, features = ["clock"] }
clap = { version = "4", features = ["derive", "env"] }
colored = "2"
config = "0"
crossbeam-skiplist = "0.1"
derive_more = "0"
fern = "0"
futures = "0"
Expand All @@ -63,8 +64,8 @@ serde_json = "1"
serde_repr = "0"
thiserror = "1"
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "packages/configuration" }
torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "packages/clock" }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "packages/configuration" }
torrust-tracker-contrib-bencode = { version = "3.0.0-alpha.12-develop", path = "contrib/bencode" }
torrust-tracker-located-error = { version = "3.0.0-alpha.12-develop", path = "packages/located-error" }
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "packages/primitives" }
Expand All @@ -76,7 +77,7 @@ url = "2"
uuid = { version = "1", features = ["v4"] }

[package.metadata.cargo-machete]
ignored = ["serde_bytes"]
ignored = ["serde_bytes", "crossbeam-skiplist"]

[dev-dependencies]
local-ip-address = "0"
Expand Down Expand Up @@ -105,4 +106,4 @@ opt-level = 3

[profile.release-debug]
inherits = "release"
debug = true
debug = true
1 change: 1 addition & 0 deletions cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
"Shareaza",
"sharktorrent",
"SHLVL",
"skiplist",
"socketaddr",
"sqllite",
"subsec",
Expand Down
5 changes: 3 additions & 2 deletions packages/torrent-repository/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ rust-version.workspace = true
version.workspace = true

[dependencies]
crossbeam-skiplist = "0.1"
futures = "0.3.29"
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" }
torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock" }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" }
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" }

[dev-dependencies]
criterion = { version = "0", features = ["async_tokio"] }
Expand Down
21 changes: 20 additions & 1 deletion packages/torrent-repository/benches/repository_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod helpers;
use criterion::{criterion_group, criterion_main, Criterion};
use torrust_tracker_torrent_repository::{
TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd,
TorrentsRwLockTokioMutexTokio,
TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
};

use crate::helpers::{asyn, sync};
Expand Down Expand Up @@ -45,6 +45,10 @@ fn add_one_torrent(c: &mut Criterion) {
.iter_custom(asyn::add_one_torrent::<TorrentsRwLockTokioMutexTokio, _>);
});

group.bench_function("SkipMapMutexStd", |b| {
b.iter_custom(sync::add_one_torrent::<TorrentsSkipMapMutexStd, _>);
});

group.finish();
}

Expand Down Expand Up @@ -89,6 +93,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::<TorrentsRwLockTokioMutexTokio, _>(&rt, iters, None));
});

group.bench_function("SkipMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.finish();
}

Expand Down Expand Up @@ -133,6 +142,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| asyn::update_one_torrent_in_parallel::<TorrentsRwLockTokioMutexTokio, _>(&rt, iters, None));
});

group.bench_function("SkipMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.finish();
}

Expand Down Expand Up @@ -178,6 +192,11 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) {
});
});

group.bench_function("SkipMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.finish();
}

Expand Down
17 changes: 11 additions & 6 deletions packages/torrent-repository/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::sync::Arc;

use repository::rw_lock_std::RwLockStd;
use repository::rw_lock_tokio::RwLockTokio;
use repository::skip_map_mutex_std::CrossbeamSkipList;
use torrust_tracker_clock::clock;

pub mod entry;
Expand All @@ -9,12 +12,14 @@ pub type EntrySingle = entry::Torrent;
pub type EntryMutexStd = Arc<std::sync::Mutex<entry::Torrent>>;
pub type EntryMutexTokio = Arc<tokio::sync::Mutex<entry::Torrent>>;

pub type TorrentsRwLockStd = repository::RwLockStd<EntrySingle>;
pub type TorrentsRwLockStdMutexStd = repository::RwLockStd<EntryMutexStd>;
pub type TorrentsRwLockStdMutexTokio = repository::RwLockStd<EntryMutexTokio>;
pub type TorrentsRwLockTokio = repository::RwLockTokio<EntrySingle>;
pub type TorrentsRwLockTokioMutexStd = repository::RwLockTokio<EntryMutexStd>;
pub type TorrentsRwLockTokioMutexTokio = repository::RwLockTokio<EntryMutexTokio>;
pub type TorrentsRwLockStd = RwLockStd<EntrySingle>;
pub type TorrentsRwLockStdMutexStd = RwLockStd<EntryMutexStd>;
pub type TorrentsRwLockStdMutexTokio = RwLockStd<EntryMutexTokio>;
pub type TorrentsRwLockTokio = RwLockTokio<EntrySingle>;
pub type TorrentsRwLockTokioMutexStd = RwLockTokio<EntryMutexStd>;
pub type TorrentsRwLockTokioMutexTokio = RwLockTokio<EntryMutexTokio>;

pub type TorrentsSkipMapMutexStd = CrossbeamSkipList<EntryMutexStd>;

/// This code needs to be copied into each crate.
/// Working version, for production.
Expand Down
35 changes: 1 addition & 34 deletions packages/torrent-repository/src/repository/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod rw_lock_std_mutex_tokio;
pub mod rw_lock_tokio;
pub mod rw_lock_tokio_mutex_std;
pub mod rw_lock_tokio_mutex_tokio;
pub mod skip_map_mutex_std;

use std::fmt::Debug;

Expand Down Expand Up @@ -40,37 +41,3 @@ pub trait RepositoryAsync<T>: Debug + Default + Sized + 'static {
peer: &peer::Peer,
) -> impl std::future::Future<Output = (bool, SwarmMetadata)> + Send;
}

#[derive(Default, Debug)]
pub struct RwLockStd<T> {
torrents: std::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
}

#[derive(Default, Debug)]
pub struct RwLockTokio<T> {
torrents: tokio::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
}

impl<T> RwLockStd<T> {
/// # Panics
///
/// Panics if unable to get a lock.
pub fn write(
&self,
) -> std::sync::RwLockWriteGuard<'_, std::collections::BTreeMap<torrust_tracker_primitives::info_hash::InfoHash, T>> {
self.torrents.write().expect("it should get lock")
}
}

impl<T> RwLockTokio<T> {
pub fn write(
&self,
) -> impl std::future::Future<
Output = tokio::sync::RwLockWriteGuard<
'_,
std::collections::BTreeMap<torrust_tracker_primitives::info_hash::InfoHash, T>,
>,
> {
self.torrents.write()
}
}
16 changes: 16 additions & 0 deletions packages/torrent-repository/src/repository/rw_lock_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ use super::Repository;
use crate::entry::Entry;
use crate::{EntrySingle, TorrentsRwLockStd};

#[derive(Default, Debug)]
pub struct RwLockStd<T> {
pub(crate) torrents: std::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
}

impl<T> RwLockStd<T> {
/// # Panics
///
/// Panics if unable to get a lock.
pub fn write(
&self,
) -> std::sync::RwLockWriteGuard<'_, std::collections::BTreeMap<torrust_tracker_primitives::info_hash::InfoHash, T>> {
self.torrents.write().expect("it should get lock")
}
}

impl TorrentsRwLockStd {
fn get_torrents<'a>(&'a self) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap<InfoHash, EntrySingle>>
where
Expand Down
18 changes: 18 additions & 0 deletions packages/torrent-repository/src/repository/rw_lock_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,24 @@ use super::RepositoryAsync;
use crate::entry::Entry;
use crate::{EntrySingle, TorrentsRwLockTokio};

#[derive(Default, Debug)]
pub struct RwLockTokio<T> {
pub(crate) torrents: tokio::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
}

impl<T> RwLockTokio<T> {
pub fn write(
&self,
) -> impl std::future::Future<
Output = tokio::sync::RwLockWriteGuard<
'_,
std::collections::BTreeMap<torrust_tracker_primitives::info_hash::InfoHash, T>,
>,
> {
self.torrents.write()
}
}

impl TorrentsRwLockTokio {
async fn get_torrents<'a>(&'a self) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap<InfoHash, EntrySingle>>
where
Expand Down
106 changes: 106 additions & 0 deletions packages/torrent-repository/src/repository/skip_map_mutex_std.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use crossbeam_skiplist::SkipMap;
use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_primitives::pagination::Pagination;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents};

use super::Repository;
use crate::entry::{Entry, EntrySync};
use crate::{EntryMutexStd, EntrySingle};

#[derive(Default, Debug)]
pub struct CrossbeamSkipList<T> {
pub torrents: SkipMap<InfoHash, T>,
}

impl Repository<EntryMutexStd> for CrossbeamSkipList<EntryMutexStd>
where
EntryMutexStd: EntrySync,
EntrySingle: Entry,
{
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
let entry = self.torrents.get_or_insert(*info_hash, Arc::default());
entry.value().insert_or_update_peer_and_get_stats(peer)
}

fn get(&self, key: &InfoHash) -> Option<EntryMutexStd> {
let maybe_entry = self.torrents.get(key);
maybe_entry.map(|entry| entry.value().clone())
}

fn get_metrics(&self) -> TorrentsMetrics {
let mut metrics = TorrentsMetrics::default();

for entry in &self.torrents {
let stats = entry.value().lock().expect("it should get a lock").get_stats();
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
metrics.torrents += 1;
}

metrics
}

fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> {
match pagination {
Some(pagination) => self
.torrents
.iter()
.skip(pagination.offset as usize)
.take(pagination.limit as usize)
.map(|entry| (*entry.key(), entry.value().clone()))
.collect(),
None => self
.torrents
.iter()
.map(|entry| (*entry.key(), entry.value().clone()))
.collect(),
}
}

fn import_persistent(&self, persistent_torrents: &PersistentTorrents) {
for (info_hash, completed) in persistent_torrents {
if self.torrents.contains_key(info_hash) {
continue;
}

let entry = EntryMutexStd::new(
EntrySingle {
peers: BTreeMap::default(),
downloaded: *completed,
}
.into(),
);

// Since SkipMap is lock-free the torrent could have been inserted
// after checking if it exists.
self.torrents.get_or_insert(*info_hash, entry);
}
}

fn remove(&self, key: &InfoHash) -> Option<EntryMutexStd> {
self.torrents.remove(key).map(|entry| entry.value().clone())
}

fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
for entry in &self.torrents {
entry.value().remove_inactive_peers(current_cutoff);
}
}

fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
for entry in &self.torrents {
if entry.value().is_good(policy) {
continue;
}

entry.remove();
}
}
}
Loading

0 comments on commit 6eff113

Please sign in to comment.