Skip to content

Commit

Permalink
Auto merge of #6272 - alexcrichton:beta-net, r=ehuss
Browse files Browse the repository at this point in the history
[beta]: Fix timeouts firing with tarball extraction

This is a backport of #6130 to the 1.31.0 branch
  • Loading branch information
bors committed Nov 6, 2018
2 parents 522c83f + bfeafde commit 2c011df
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ crates-io = { path = "src/crates-io", version = "0.20" }
crossbeam-utils = "0.5"
crypto-hash = "0.3.1"
curl = { version = "0.4.17", features = ['http2'] }
curl-sys = "0.4.12"
env_logger = "0.5.11"
failure = "0.1.2"
filetime = "0.2"
Expand Down
171 changes: 150 additions & 21 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::path::{Path, PathBuf};
use std::time::{Instant, Duration};

use bytesize::ByteSize;
use curl;
use curl_sys;
use curl::easy::{Easy, HttpVersion};
use curl::multi::{Multi, EasyHandle};
use lazycell::LazyCell;
Expand Down Expand Up @@ -255,11 +257,10 @@ pub struct PackageSet<'cfg> {

pub struct Downloads<'a, 'cfg: 'a> {
set: &'a PackageSet<'cfg>,
pending: HashMap<usize, (Download, EasyHandle)>,
pending: HashMap<usize, (Download<'cfg>, EasyHandle)>,
pending_ids: HashSet<PackageId>,
results: Vec<(usize, CargoResult<()>)>,
results: Vec<(usize, Result<(), curl::Error>)>,
next: usize,
retry: Retry<'cfg>,
progress: RefCell<Option<Progress<'cfg>>>,
downloads_finished: usize,
downloaded_bytes: u64,
Expand All @@ -268,15 +269,51 @@ pub struct Downloads<'a, 'cfg: 'a> {
success: bool,
}

struct Download {
struct Download<'cfg> {
/// Token for this download, used as the key of the `Downloads::pending` map
/// and stored in `EasyHandle` as well.
token: usize,

/// Package that we're downloading
id: PackageId,

/// Actual downloaded data, updated throughout the lifetime of this download
data: RefCell<Vec<u8>>,

/// The URL that we're downloading from, cached here for error messages and
/// reenqueuing.
url: String,

/// A descriptive string to print when we've finished downloading this crate
descriptor: String,

/// Statistics updated from the progress callback in libcurl
total: Cell<u64>,
current: Cell<u64>,

/// The moment we started this transfer at
start: Instant,

/// Last time we noticed that we got some more data from libcurl
updated_at: Cell<Instant>,

/// Timeout management, both of timeout thresholds as well as whether or not
/// our connection has timed out (and accompanying message if it has).
///
/// Note that timeout management is done manually here because we have a
/// `Multi` with a lot of active transfers but between transfers finishing
/// we perform some possibly slow synchronous work (like grabbing file
/// locks, extracting tarballs, etc). The default timers on our `Multi` keep
/// running during this work, but we don't want them to count towards timing
/// everythig out. As a result, we manage this manually and take the time
/// for synchronous work into account manually.
timeout: ops::HttpTimeout,
timed_out: Cell<Option<String>>,
next_speed_check: Cell<Instant>,
next_speed_check_bytes_threshold: Cell<u64>,

/// Logic used to track retrying this download if it's a spurious failure.
retry: Retry<'cfg>,
}

impl<'cfg> PackageSet<'cfg> {
Expand Down Expand Up @@ -329,7 +366,6 @@ impl<'cfg> PackageSet<'cfg> {
pending: HashMap::new(),
pending_ids: HashSet::new(),
results: Vec::new(),
retry: Retry::new(self.config)?,
progress: RefCell::new(Some(Progress::with_style(
"Downloading",
ProgressStyle::Ratio,
Expand Down Expand Up @@ -410,7 +446,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
debug!("downloading {} as {}", id, token);
assert!(self.pending_ids.insert(id.clone()));

let mut handle = ops::http_handle(self.set.config)?;
let (mut handle, timeout) = ops::http_handle_and_timeout(self.set.config)?;
handle.get(true)?;
handle.url(&url)?;
handle.follow_location(true)?; // follow redirects
Expand Down Expand Up @@ -448,14 +484,10 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
handle.progress(true)?;
handle.progress_function(move |dl_total, dl_cur, _, _| {
tls::with(|downloads| {
let downloads = match downloads {
Some(d) => d,
None => return false,
};
let dl = &downloads.pending[&token].0;
dl.total.set(dl_total as u64);
dl.current.set(dl_cur as u64);
downloads.tick(WhyTick::DownloadUpdate).is_ok()
match downloads {
Some(d) => d.progress(token, dl_total as u64, dl_cur as u64),
None => false,
}
})
})?;

Expand All @@ -469,6 +501,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
self.set.config.shell().status("Downloading", "crates ...")?;
}

let now = Instant::now();
let dl = Download {
token,
data: RefCell::new(Vec::new()),
Expand All @@ -478,6 +511,12 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
total: Cell::new(0),
current: Cell::new(0),
start: Instant::now(),
updated_at: Cell::new(now),
timeout,
timed_out: Cell::new(None),
next_speed_check: Cell::new(now),
next_speed_check_bytes_threshold: Cell::new(0),
retry: Retry::new(self.set.config)?,
};
self.enqueue(dl, handle)?;
self.tick(WhyTick::DownloadStarted)?;
Expand Down Expand Up @@ -514,12 +553,35 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// then we want to re-enqueue our request for another attempt and
// then we wait for another request to finish.
let ret = {
self.retry.try(|| {
result?;
let timed_out = &dl.timed_out;
let url = &dl.url;
dl.retry.try(|| {
if let Err(e) = result {
// If this error is "aborted by callback" then that's
// probably because our progress callback aborted due to
// a timeout. We'll find out by looking at the
// `timed_out` field, looking for a descriptive message.
// If one is found we switch the error code (to ensure
// it's flagged as spurious) and then attach our extra
// information to the error.
if !e.is_aborted_by_callback() {
return Err(e.into())
}

return Err(match timed_out.replace(None) {
Some(msg) => {
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
let mut err = curl::Error::new(code);
err.set_extra(msg);
err
}
None => e,
}.into())
}

let code = handle.response_code()?;
if code != 200 && code != 0 {
let url = handle.effective_url()?.unwrap_or(&dl.url);
let url = handle.effective_url()?.unwrap_or(url);
return Err(HttpNot200 {
code,
url: url.to_string(),
Expand Down Expand Up @@ -568,20 +630,39 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
let source = sources
.get_mut(dl.id.source_id())
.ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
let start = Instant::now();
let pkg = source.finish_download(&dl.id, data)?;

// Assume that no time has passed while we were calling
// `finish_download`, update all speed checks and timeout limits of all
// active downloads to make sure they don't fire because of a slowly
// extracted tarball.
let finish_dur = start.elapsed();
for (dl, _) in self.pending.values_mut() {
dl.updated_at.set(dl.updated_at.get() + finish_dur);
dl.next_speed_check.set(dl.next_speed_check.get() + finish_dur);
}

let slot = &self.set.packages[&dl.id];
assert!(slot.fill(pkg).is_ok());
Ok(slot.borrow().unwrap())
}

fn enqueue(&mut self, dl: Download, handle: Easy) -> CargoResult<()> {
fn enqueue(&mut self, dl: Download<'cfg>, handle: Easy) -> CargoResult<()> {
let mut handle = self.set.multi.add(handle)?;
let now = Instant::now();
handle.set_token(dl.token)?;
dl.timed_out.set(None);
dl.updated_at.set(now);
dl.current.set(0);
dl.total.set(0);
dl.next_speed_check.set(now + dl.timeout.dur);
dl.next_speed_check_bytes_threshold.set(dl.timeout.low_speed_limit as u64);
self.pending.insert(dl.token, (dl, handle));
Ok(())
}

fn wait_for_curl(&mut self) -> CargoResult<(usize, CargoResult<()>)> {
fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> {
// This is the main workhorse loop. We use libcurl's portable `wait`
// method to actually perform blocking. This isn't necessarily too
// efficient in terms of fd management, but we should only be juggling
Expand Down Expand Up @@ -609,7 +690,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
let token = msg.token().expect("failed to read token");
let handle = &pending[&token].1;
if let Some(result) = msg.result_for(&handle) {
results.push((token, result.map_err(|e| e.into())));
results.push((token, result));
} else {
debug!("message without a result (?)");
}
Expand All @@ -619,11 +700,59 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
break Ok(pair)
}
assert!(self.pending.len() > 0);
self.set.multi.wait(&mut [], Duration::new(60, 0))
let timeout = self.set.multi.get_timeout()?
.unwrap_or(Duration::new(5, 0));
self.set.multi.wait(&mut [], timeout)
.chain_err(|| "failed to wait on curl `Multi`")?;
}
}

fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
let dl = &self.pending[&token].0;
dl.total.set(total);
let now = Instant::now();
if cur != dl.current.get() {
dl.current.set(cur);
dl.updated_at.set(now);

if dl.current.get() >= dl.next_speed_check_bytes_threshold.get() {
dl.next_speed_check.set(now + dl.timeout.dur);
dl.next_speed_check_bytes_threshold.set(
dl.current.get() + dl.timeout.low_speed_limit as u64,
);
}
}
if !self.tick(WhyTick::DownloadUpdate).is_ok() {
return false
}

// If we've spent too long not actually receiving any data we time out.
if now - dl.updated_at.get() > dl.timeout.dur {
let msg = format!("failed to download any data for `{}` within {}s",
dl.id,
dl.timeout.dur.as_secs());
dl.timed_out.set(Some(msg));
return false
}

// If we reached the point in time that we need to check our speed
// limit, see if we've transferred enough data during this threshold. If
// it fails this check then we fail because the download is going too
// slowly.
if now >= dl.next_speed_check.get() {
assert!(dl.current.get() < dl.next_speed_check_bytes_threshold.get());
let msg = format!("download of `{}` failed to transfer more \
than {} bytes in {}s",
dl.id,
dl.timeout.low_speed_limit,
dl.timeout.dur.as_secs());
dl.timed_out.set(Some(msg));
return false
}

true
}

fn tick(&self, why: WhyTick) -> CargoResult<()> {
let mut progress = self.progress.borrow_mut();
let progress = progress.as_mut().unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/cargo/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ extern crate core_foundation;
extern crate crates_io as registry;
extern crate crossbeam_utils;
extern crate curl;
extern crate curl_sys;
#[macro_use]
extern crate failure;
extern crate filetime;
Expand Down
3 changes: 2 additions & 1 deletion src/cargo/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ pub use self::cargo_package::{package, PackageOpts};
pub use self::registry::{publish, registry_configuration, RegistryConfig};
pub use self::registry::{http_handle, needs_custom_http_transport, registry_login, search};
pub use self::registry::{modify_owners, yank, OwnersOptions, PublishOpts};
pub use self::registry::configure_http_handle;
pub use self::registry::{configure_http_handle, http_handle_and_timeout};
pub use self::registry::HttpTimeout;
pub use self::cargo_fetch::{fetch, FetchOptions};
pub use self::cargo_pkgid::pkgid;
pub use self::resolve::{add_overrides, get_resolved_packages, resolve_with_previous, resolve_ws,
Expand Down
Loading

0 comments on commit 2c011df

Please sign in to comment.