Skip to content

Commit

Permalink
Checkpoint background download.
Browse files Browse the repository at this point in the history
  • Loading branch information
ehuss committed Oct 18, 2018
1 parent cbde1c6 commit 872bcb6
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 104 deletions.
112 changes: 89 additions & 23 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::fmt;
use std::hash;
use std::mem;
use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread;
use std::time::{Instant, Duration};

use bytesize::ByteSize;
Expand All @@ -19,7 +21,8 @@ use core::{FeatureMap, SourceMap, Summary};
use core::source::MaybePackage;
use core::interning::InternedString;
use ops;
use util::{self, internal, lev_distance, Config, Progress, ProgressStyle};
use sources::registry::unpack_package;
use util::{self, internal, lev_distance, Config, FileLock, Filesystem, Progress, ProgressStyle};
use util::errors::{CargoResult, CargoResultExt, HttpNot200};
use util::network::Retry;

Expand Down Expand Up @@ -266,6 +269,9 @@ pub struct Downloads<'a, 'cfg: 'a> {
largest: (u64, String),
start: Instant,
success: bool,
save_work_tx: Sender<(PackageId, Filesystem, FileLock)>,
save_result_rx: Receiver<CargoResult<(PackageId, FileLock)>>,
extracting: usize,
}

struct Download {
Expand Down Expand Up @@ -322,6 +328,11 @@ impl<'cfg> PackageSet<'cfg> {

pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'cfg>> {
assert!(!self.downloading.replace(true));
let (save_work_tx, save_work_rx) = channel();
let (save_result_tx, save_result_rx) = channel();
thread::spawn(move || {
save_worker(save_work_rx, save_result_tx)
});
Ok(Downloads {
start: Instant::now(),
set: self,
Expand All @@ -339,6 +350,9 @@ impl<'cfg> PackageSet<'cfg> {
downloaded_bytes: 0,
largest: (0, String::new()),
success: false,
save_work_tx,
save_result_rx,
extracting: 0,
})
}

Expand Down Expand Up @@ -487,7 +501,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {

/// Returns the number of crates that are still downloading
pub fn remaining(&self) -> usize {
self.pending.len()
self.pending.len() + self.extracting
}

/// Blocks the current thread waiting for a package to finish downloading.
Expand All @@ -499,6 +513,37 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
///
/// This function will panic if there are no remaining downloads.
pub fn wait(&mut self) -> CargoResult<&'a Package> {
loop {
if let Some(pkg) = self.wait_try()? {
return Ok(pkg);
}
}
}

fn wait_try(&mut self) -> CargoResult<Option<&'a Package>> {
// Check if any packages have finished extracting on the background thread.
let save_result = if self.pending.is_empty() {
// Downloads are finished, just waiting for the final packages to
// finish extracting.
self.tick(WhyTick::Extracting)?;
Some(self.save_result_rx.recv()?)
} else {
self.save_result_rx.try_recv().ok()
};
if let Some(result) = save_result {
self.extracting -= 1;
let (pkg_id, path) = result?;
let mut sources = self.set.sources.borrow_mut();
let source = sources
.get_mut(pkg_id.source_id())
.ok_or_else(|| internal(format!("couldn't find source for `{}`", pkg_id)))?;

let pkg = source.finish_download(&pkg_id, path)?;
let slot = &self.set.packages[&pkg_id];
assert!(slot.fill(pkg).is_ok());
return Ok(Some(slot.borrow().unwrap()));
}

let (dl, data) = loop {
assert_eq!(self.pending.len(), self.pending_ids.len());
let (token, result) = self.wait_for_curl()?;
Expand Down Expand Up @@ -551,27 +596,26 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
self.largest = (dl.total.get(), dl.id.name().to_string());
}

// TODO update this
// We're about to synchronously extract the crate below. While we're
// doing that our download progress won't actually be updated, nor do we
// have a great view into the progress of the extraction. Let's prepare
// the user for this CPU-heavy step if it looks like it'll take some
// time to do so.
if dl.total.get() < ByteSize::kb(400).0 {
self.tick(WhyTick::DownloadFinished)?;
} else {
self.tick(WhyTick::Extracting(&dl.id.name()))?;
}
self.tick(WhyTick::DownloadFinished)?;

// Inform the original source that the download is finished which
// should allow us to actually get the package and fill it in now.
let mut sources = self.set.sources.borrow_mut();
let source = sources
.get_mut(dl.id.source_id())
.ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
let pkg = source.finish_download(&dl.id, data)?;
let slot = &self.set.packages[&dl.id];
assert!(slot.fill(pkg).is_ok());
Ok(slot.borrow().unwrap())
// Add this package to the queue of packages to be extracted by the
// background thread.
{
let mut sources = self.set.sources.borrow_mut();
let source = sources
.get_mut(dl.id.source_id())
.ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
let (src_path, tarball) = source.save_download(&dl.id, data)?;
self.extracting += 1;
self.save_work_tx.send((dl.id.clone(), src_path, tarball))?;
}
Ok(None)
}

fn enqueue(&mut self, dl: Download, handle: Easy) -> CargoResult<()> {
Expand Down Expand Up @@ -633,10 +677,9 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
return Ok(())
}
}
let mut msg = format!("{} crates", self.pending.len());
match why {
WhyTick::Extracting(krate) => {
msg.push_str(&format!(", extracting {} ...", krate));
WhyTick::Extracting => {
progress.print_now("(extracting...)")
}
_ => {
let mut dur = Duration::new(0, 0);
Expand All @@ -650,20 +693,43 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
remaining += dl.total.get() - dl.current.get();
}
}
let mut msg = format!("{} crates", self.pending.len());
if remaining > 0 && dur > Duration::from_millis(500) {
msg.push_str(&format!(", remaining bytes: {}", ByteSize(remaining)));
}
progress.print_now(&msg)
}
}
progress.print_now(&msg)
}
}

enum WhyTick<'a> {
fn save_worker(
save_work_rx: Receiver<(PackageId, Filesystem, FileLock)>,
save_result_tx: Sender<CargoResult<(PackageId, FileLock)>>,
) {
loop {
if let Ok((pkg, src_path, tarball)) = save_work_rx.recv() {
save_result_tx.send(extract_package(pkg, src_path, tarball)).unwrap();
} else {
return;
}
}
}

fn extract_package(
pkg: PackageId,
src_path: Filesystem,
tarball: FileLock,
) -> CargoResult<(PackageId, FileLock)> {
unpack_package(&src_path, &pkg, &tarball)?;
Ok((pkg, tarball))
}

enum WhyTick {
DownloadStarted,
DownloadUpdate,
DownloadFinished,
Extracting(&'a str),
Extracting,
}

impl<'a, 'cfg> Drop for Downloads<'a, 'cfg> {
Expand Down
21 changes: 15 additions & 6 deletions src/cargo/core/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::hash_map::HashMap;
use std::fmt;

use core::{Dependency, Package, PackageId, Summary};
use util::CargoResult;
use util::{CargoResult, FileLock, Filesystem};

mod source_id;

Expand Down Expand Up @@ -51,7 +51,8 @@ pub trait Source {
/// version specified.
fn download(&mut self, package: &PackageId) -> CargoResult<MaybePackage>;

fn finish_download(&mut self, package: &PackageId, contents: Vec<u8>) -> CargoResult<Package>;
fn save_download(&mut self, package: &PackageId, contents: Vec<u8>) -> CargoResult<(Filesystem, FileLock)>;
fn finish_download(&mut self, package: &PackageId, path: FileLock) -> CargoResult<Package>;

/// Generates a unique string which represents the fingerprint of the
/// current state of the source.
Expand Down Expand Up @@ -131,8 +132,12 @@ impl<'a, T: Source + ?Sized + 'a> Source for Box<T> {
(**self).download(id)
}

fn finish_download(&mut self, id: &PackageId, data: Vec<u8>) -> CargoResult<Package> {
(**self).finish_download(id, data)
fn save_download(&mut self, package: &PackageId, contents: Vec<u8>) -> CargoResult<(Filesystem, FileLock)> {
(**self).save_download(package, contents)
}

fn finish_download(&mut self, package: &PackageId, path: FileLock) -> CargoResult<Package> {
(**self).finish_download(package, path)
}

/// Forwards to `Source::fingerprint`
Expand Down Expand Up @@ -187,8 +192,12 @@ impl<'a, T: Source + ?Sized + 'a> Source for &'a mut T {
(**self).download(id)
}

fn finish_download(&mut self, id: &PackageId, data: Vec<u8>) -> CargoResult<Package> {
(**self).finish_download(id, data)
fn save_download(&mut self, package: &PackageId, contents: Vec<u8>) -> CargoResult<(Filesystem, FileLock)> {
(**self).save_download(package, contents)
}

fn finish_download(&mut self, package: &PackageId, path: FileLock) -> CargoResult<Package> {
(**self).finish_download(package, path)
}

fn fingerprint(&self, pkg: &Package) -> CargoResult<String> {
Expand Down
8 changes: 6 additions & 2 deletions src/cargo/sources/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use serde_json;
use core::{Dependency, Package, PackageId, Source, SourceId, Summary};
use core::source::MaybePackage;
use sources::PathSource;
use util::{Config, Sha256};
use util::{Config, FileLock, Filesystem, Sha256};
use util::errors::{CargoResult, CargoResultExt};
use util::paths;

Expand Down Expand Up @@ -160,7 +160,11 @@ impl<'cfg> Source for DirectorySource<'cfg> {
.ok_or_else(|| format_err!("failed to find package with id: {}", id))
}

fn finish_download(&mut self, _id: &PackageId, _data: Vec<u8>) -> CargoResult<Package> {
fn save_download(&mut self, _package: &PackageId, _contents: Vec<u8>) -> CargoResult<(Filesystem, FileLock)> {
panic!("no downloads to do")
}

fn finish_download(&mut self, _package: &PackageId, _path: FileLock) -> CargoResult<Package> {
panic!("no downloads to do")
}

Expand Down
8 changes: 6 additions & 2 deletions src/cargo/sources/git/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use url::Url;
use core::source::{Source, SourceId, MaybePackage};
use core::GitReference;
use core::{Dependency, Package, PackageId, Summary};
use util::Config;
use util::{Config, FileLock, Filesystem};
use util::errors::CargoResult;
use util::hex::short_hash;
use sources::PathSource;
Expand Down Expand Up @@ -222,7 +222,11 @@ impl<'cfg> Source for GitSource<'cfg> {
.download(id)
}

fn finish_download(&mut self, _id: &PackageId, _data: Vec<u8>) -> CargoResult<Package> {
fn save_download(&mut self, _package: &PackageId, _contents: Vec<u8>) -> CargoResult<(Filesystem, FileLock)> {
panic!("no downloads to do")
}

fn finish_download(&mut self, _package: &PackageId, _path: FileLock) -> CargoResult<Package> {
panic!("no download should have started")
}

Expand Down
8 changes: 6 additions & 2 deletions src/cargo/sources/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use ignore::gitignore::GitignoreBuilder;
use core::{Dependency, Package, PackageId, Source, SourceId, Summary};
use core::source::MaybePackage;
use ops;
use util::{self, internal, CargoResult};
use util::{self, internal, CargoResult, FileLock, Filesystem};
use util::paths;
use util::Config;

Expand Down Expand Up @@ -550,7 +550,11 @@ impl<'cfg> Source for PathSource<'cfg> {
.ok_or_else(|| internal(format!("failed to find {} in path source", id)))
}

fn finish_download(&mut self, _id: &PackageId, _data: Vec<u8>) -> CargoResult<Package> {
fn save_download(&mut self, _package: &PackageId, _contents: Vec<u8>) -> CargoResult<(Filesystem, FileLock)> {
panic!("no download should have started")
}

fn finish_download(&mut self, _package: &PackageId, _path: FileLock) -> CargoResult<Package> {
panic!("no download should have started")
}

Expand Down
2 changes: 1 addition & 1 deletion src/cargo/sources/registry/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl<'cfg> RegistryData for LocalRegistry<'cfg> {
Ok(MaybeLock::Ready(crate_file))
}

fn finish_download(&mut self, _pkg: &PackageId, _checksum: &str, _data: &[u8])
fn save_download(&mut self, _pkg: &PackageId, _checksum: &str, _data: &[u8])
-> CargoResult<FileLock>
{
panic!("this source doesn't download")
Expand Down
Loading

0 comments on commit 872bcb6

Please sign in to comment.