Skip to content

Commit

Permalink
Merge pull request #49 from mandreyel/share-downloads
Browse files Browse the repository at this point in the history
Start sharing piece downloads among peers for faster completion
  • Loading branch information
vimpunk authored Nov 14, 2020
2 parents c050ed8 + 47cd0e5 commit 82cea62
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 91 deletions.
16 changes: 13 additions & 3 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -587,9 +587,19 @@ arrives.

## Piece download

A piece download tracks the piece completion of an ongoing piece download. It
will play an important role in optimizing download performance, but none of
that is implemented for now (see research notes).
A piece download tracks the piece completion of an ongoing piece download.

It plays an important role in optimizing download performance:
- sharing block requests in piece among peers,
- timing out peer requests,
- and end game mode to speed up the last part of the
download.

Currently the last two are not implemented.

Piece downloads are stored in the torrent and shared with all peers in the
torrent. When a peer starts a new download, it places the download instance in
the shared torrent object. This way other peers may join this download.


## Disk IO
Expand Down
6 changes: 5 additions & 1 deletion cratetorrent/src/disk/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,17 @@ impl Disk {
pub(super) async fn start(&mut self) -> Result<()> {
log::info!("Starting disk IO event loop");
while let Some(cmd) = self.cmd_port.recv().await {
log::debug!("Disk received command");
match cmd {
Command::NewTorrent {
id,
info,
piece_hashes,
} => {
log::trace!(
"Disk received NewTorrent command: id={}, info={:?}",
id,
info
);
if self.torrents.contains_key(&id) {
log::warn!("Torrent {} already allocated", id);
self.alert_chan.send(Alert::TorrentAllocation(Err(
Expand Down
6 changes: 3 additions & 3 deletions cratetorrent/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl PieceDownload {
/// Picks the requested number of blocks or fewer, if fewer are remaining.
pub fn pick_blocks(&mut self, count: usize, blocks: &mut Vec<BlockInfo>) {
log::trace!(
"Picking {} block(s) in piece {} (length: {}, blocks: {})",
"Trying to pick {} block(s) in piece {} (length: {}, blocks: {})",
count,
self.index,
self.len,
Expand Down Expand Up @@ -73,14 +73,14 @@ impl PieceDownload {
}

if picked > 0 {
log::debug!(
log::trace!(
"Picked {} block(s) for piece {}: {:?}",
picked,
self.index,
&blocks[blocks.len() - picked..]
);
} else {
log::debug!("Cannot pick any blocks in piece {}", self.index);
log::trace!("Cannot pick any blocks in piece {}", self.index);
}
}

Expand Down
154 changes: 87 additions & 67 deletions cratetorrent/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use {
SinkExt, StreamExt,
},
std::{
collections::HashMap,
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
Expand All @@ -25,7 +24,6 @@ use {
crate::{
disk::DiskHandle, download::PieceDownload, error::*,
piece_picker::PiecePicker, torrent::SharedStatus, Bitfield, BlockInfo,
PieceIndex,
},
codec::*,
state::*,
Expand All @@ -48,9 +46,6 @@ pub(crate) struct PeerSession {
addr: SocketAddr,
/// Session related information.
state: SessionState,
/// These are the active piece downloads in which this session is
/// participating.
downloads: HashMap<PieceIndex, PieceDownload>,
/// Our pending requests that we sent to peer. It represents the blocks that
/// we are expecting. Thus, if we receive a block that is not in this list,
/// we need to drop it. If we receive a block whose request entry is in
Expand Down Expand Up @@ -89,7 +84,6 @@ impl PeerSession {
cmd_port: cmd_port.fuse(),
addr,
state: SessionState::default(),
downloads: HashMap::new(),
outgoing_requests: Vec::new(),
},
cmd_chan,
Expand Down Expand Up @@ -263,16 +257,24 @@ impl PeerSession {
if let Some(last_outgoing_request_time) =
self.state.last_outgoing_request_time
{
let elapsed_since_last_request =
last_outgoing_request_time.elapsed();
if elapsed_since_last_request > self.state.request_timeout() {
let elapsed_since_last_request = Instant::now()
.saturating_duration_since(last_outgoing_request_time);
let request_timeout = self.state.request_timeout();
log::debug!(
"[Peer {}] Checking request timeout \
(last {} ms ago, timeout: {} ms)",
self.addr,
elapsed_since_last_request.as_millis(),
request_timeout.as_millis()
);
if elapsed_since_last_request > request_timeout {
log::warn!(
"[Peer {}] timeout after {} ms (timeouts: {})",
self.addr,
elapsed_since_last_request.as_millis(),
self.state.timed_out_request_count + 1,
);
self.state.record_request_timeout();
self.state.register_request_timeout();
// Cancel all requests and re-issue a single one (since we can
// only request a single block now). Start by freeing up the
// block in its piece download.
Expand All @@ -283,9 +285,14 @@ impl PeerSession {
//
// TODO(https://github.com/mandreyel/cratetorrent/issues/11):
// can we handle this without unwrapping?
self.downloads
self.torrent
.downloads
.write()
.await
.get_mut(&block.piece_index)
.expect("No corresponding PieceDownload for request")
.write()
.await
.cancel_request(block);
}
self.outgoing_requests.clear();
Expand Down Expand Up @@ -347,10 +354,12 @@ impl PeerSession {
return Err(Error::PeerNotSeed);
}

// register peer's pieces with piece picker
let mut piece_picker = self.piece_picker.write().await;
self.state.is_interested =
piece_picker.register_availability(&bitfield)?;
// register peer's pieces with piece picker and determine interest in it
self.state.is_interested = self
.piece_picker
.write()
.await
.register_availability(&bitfield)?;
debug_assert!(self.state.is_interested);
if let Some(peer_info) = &mut self.state.peer {
peer_info.pieces = Some(bitfield);
Expand Down Expand Up @@ -477,36 +486,33 @@ impl PeerSession {

// TODO: optimize this by preallocating the vector in self
let mut blocks = Vec::new();
let target_request_queue_len =
self.state.target_request_queue_len.unwrap_or_default();

// If we have active downloads, prefer to continue those. This will
// result in less in-progress pieces.
for download in self.downloads.values_mut() {
log::debug!(
"Peer {} trying to continue download {}",
self.addr,
download.piece_index()
);

for download in self.torrent.downloads.write().await.values_mut() {
// check and calculate the number of requests we can make now
let outgoing_request_count =
blocks.len() + self.outgoing_requests.len();
// our outgoing request queue shouldn't exceed the allowed request
// queue size
debug_assert!(
self.state.target_request_queue_len.unwrap_or_default()
>= outgoing_request_count
);
// the number of requests we can make now
let to_request_count =
self.state.target_request_queue_len.unwrap_or_default()
- outgoing_request_count;
if to_request_count == 0 {
if outgoing_request_count >= target_request_queue_len {
break;
}
let to_request_count =
target_request_queue_len - outgoing_request_count;

// TODO: should we not check first that we aren't already
// downloading all of the piece's blocks?
// TODO: should we check first that we aren't already downloading
// all of the piece's blocks? requires read then write

download.pick_blocks(to_request_count, &mut blocks);
let mut download_write_guard = download.write().await;
log::trace!(
"Peer {} trying to continue download {}",
self.addr,
download_write_guard.piece_index()
);
download_write_guard.pick_blocks(to_request_count, &mut blocks);
}

// while we can make more requests we start new download(s)
Expand All @@ -515,21 +521,15 @@ impl PeerSession {
blocks.len() + self.outgoing_requests.len();
// our outgoing request queue shouldn't exceed the allowed request
// queue size
debug_assert!(
self.state.target_request_queue_len.unwrap_or_default()
>= outgoing_request_count
);
let to_request_count =
self.state.target_request_queue_len.unwrap_or_default()
- outgoing_request_count;
if to_request_count == 0 {
if outgoing_request_count >= target_request_queue_len {
break;
}
let to_request_count =
target_request_queue_len - outgoing_request_count;

log::debug!("Session {} starting new piece download", self.addr);

let mut piece_picker = self.piece_picker.write().await;
if let Some(index) = piece_picker.pick_piece() {
if let Some(index) = self.piece_picker.write().await.pick_piece() {
log::info!("Session {} picked piece {}", self.addr, index);

let mut download = PieceDownload::new(
Expand All @@ -539,11 +539,18 @@ impl PeerSession {

download.pick_blocks(to_request_count, &mut blocks);
// save download
self.downloads.insert(index, download);
self.torrent
.downloads
.write()
.await
.insert(index, RwLock::new(download));
} else {
log::debug!(
"Could not pick more pieces from peer {}",
self.addr
"Could not pick more pieces from peer {} (pending: \
pieces: {}, blocks: {})",
self.addr,
self.torrent.downloads.read().await.len(),
self.outgoing_requests.len(),
);
break;
}
Expand All @@ -556,19 +563,17 @@ impl PeerSession {
self.addr,
self.outgoing_requests.len()
);
let request_time = Instant::now();
self.state.last_outgoing_request_time = Some(Instant::now());
// save current volley of requests
self.outgoing_requests.extend_from_slice(&blocks);
// make the actual requests
for block in blocks.iter() {
// TODO: batch these in a single syscall, or is this already
// being done by the tokio codec type?
sink.send(Message::Request(*block)).await?;
self.state.uploaded_protocol_counter +=
MessageId::Request.header_len();
}

self.state.last_outgoing_request_time = Some(request_time);
self.state.uploaded_protocol_counter +=
blocks.len() as u64 * MessageId::Request.header_len();
}

Ok(())
Expand Down Expand Up @@ -611,10 +616,31 @@ impl PeerSession {
// remove block from our pending requests queue
self.outgoing_requests.remove(request_pos);

let piece_index = block_info.piece_index;
// mark the block as downloaded with its respective piece
// download instance
let download = self.downloads.get_mut(&piece_index);
self.register_downloaded_block(&block_info).await;

// update download stats
self.state.update_download_stats(block_info.len);

// validate and save the block to disk by sending a write command to the
// disk task
self.disk.write_block(self.torrent.id, block_info, data)?;

Ok(())
}

/// Marks the newly downloaded block in the piece picker and its piece
/// download instance.
///
/// If the block completes the piece, the piece download is removed from the
/// shared download map and the piece is marked as complete in the piece
/// picker.
async fn register_downloaded_block(&self, block_info: &BlockInfo) {
let mut downloads_write_guard = self.torrent.downloads.write().await;

let piece_index = block_info.piece_index;
let download = downloads_write_guard.get_mut(&piece_index);
// this fires as a result of a broken invariant: we
// shouldn't have an entry in `outgoing_requests` without a
// corresponding entry in `downloads`
Expand All @@ -623,25 +649,19 @@ impl PeerSession {
// handle this without unwrapping?
let download =
download.expect("No corresponding PieceDownload for request");
download.received_block(&block_info);
download.write().await.received_block(&block_info);

// finish download of piece if this was the last missing block in it
if download.is_complete() {
if download.read().await.is_complete() {
log::info!("Finished piece {} via peer {}", piece_index, self.addr);
// remove piece download from `downloads`
downloads_write_guard.remove(&piece_index);
// drop the write guard to avoid holding two write locks that could
// later cause deadlocks
drop(downloads_write_guard);
// register received piece
self.piece_picker.write().await.received_piece(piece_index);
// remove piece download from `downloads`
self.downloads.remove(&piece_index);
}

// update download stats
self.state.update_download_stats(block_info.len);

// validate and save the block to disk by sending a write command to the
// disk task
self.disk.write_block(self.torrent.id, block_info, data)?;

Ok(())
}
}

Expand Down
Loading

0 comments on commit 82cea62

Please sign in to comment.