Skip to content

Commit

Permalink
Timeout batch downloads, not each download
Browse files Browse the repository at this point in the history
This commit switches the timeout logic implemented in #6130 to timeout
an entire batch of downloads instead of each download individually.
Previously if *any* pending download didn't receive data in 30s we would
time out, or if *any* pending download didn't receive 10 bytes in 30s we
would time out. On very slow network connections this is highly likely
to happen as a trickle of incoming bytes may not be spread equally
amongst all connections, and not all connections may actually be active
at any one point in time.

The fix is to instead apply timeout logic for an entire batch of
downloads. Only if zero total data isn't received in the timeout window
do we time out. Or in other words, if any data for any download is
receive we consider it as not being timed out. Similarly any progress on
any download counts as progress towards our speed limit.

Closes #6284
  • Loading branch information
alexcrichton committed Nov 8, 2018
1 parent 1ffd248 commit 4e1e3f7
Showing 1 changed file with 40 additions and 41 deletions.
81 changes: 40 additions & 41 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,17 @@ pub struct Downloads<'a, 'cfg: 'a> {
largest: (u64, String),
start: Instant,
success: bool,

/// Timeout management, both of timeout thresholds as well as whether or not
/// our connection has timed out (and accompanying message if it has).
///
/// Note that timeout management is done manually here instead of in libcurl
/// because we want to apply timeouts to an entire batch of operations, not
/// any one particular single operatino
timeout: ops::HttpTimeout, // timeout configuration
updated_at: Cell<Instant>, // last time we received bytes
next_speed_check: Cell<Instant>, // if threshold isn't 0 by this time, error
next_speed_check_bytes_threshold: Cell<u64>, // decremented when we receive bytes
}

struct Download<'cfg> {
Expand All @@ -293,24 +304,7 @@ struct Download<'cfg> {

/// The moment we started this transfer at
start: Instant,

/// Last time we noticed that we got some more data from libcurl
updated_at: Cell<Instant>,

/// Timeout management, both of timeout thresholds as well as whether or not
/// our connection has timed out (and accompanying message if it has).
///
/// Note that timeout management is done manually here because we have a
/// `Multi` with a lot of active transfers but between transfers finishing
/// we perform some possibly slow synchronous work (like grabbing file
/// locks, extracting tarballs, etc). The default timers on our `Multi` keep
/// running during this work, but we don't want them to count towards timing
/// everythig out. As a result, we manage this manually and take the time
/// for synchronous work into account manually.
timeout: ops::HttpTimeout,
timed_out: Cell<Option<String>>,
next_speed_check: Cell<Instant>,
next_speed_check_bytes_threshold: Cell<u64>,

/// Logic used to track retrying this download if it's a spurious failure.
retry: Retry<'cfg>,
Expand Down Expand Up @@ -359,6 +353,7 @@ impl<'cfg> PackageSet<'cfg> {

pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'cfg>> {
assert!(!self.downloading.replace(true));
let timeout = ops::HttpTimeout::new(self.config)?;
Ok(Downloads {
start: Instant::now(),
set: self,
Expand All @@ -375,6 +370,10 @@ impl<'cfg> PackageSet<'cfg> {
downloaded_bytes: 0,
largest: (0, String::new()),
success: false,
updated_at: Cell::new(Instant::now()),
timeout,
next_speed_check: Cell::new(Instant::now()),
next_speed_check_bytes_threshold: Cell::new(0),
})
}

Expand Down Expand Up @@ -446,7 +445,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
debug!("downloading {} as {}", id, token);
assert!(self.pending_ids.insert(id.clone()));

let (mut handle, timeout) = ops::http_handle_and_timeout(self.set.config)?;
let (mut handle, _timeout) = ops::http_handle_and_timeout(self.set.config)?;
handle.get(true)?;
handle.url(&url)?;
handle.follow_location(true)?; // follow redirects
Expand Down Expand Up @@ -501,7 +500,6 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
self.set.config.shell().status("Downloading", "crates ...")?;
}

let now = Instant::now();
let dl = Download {
token,
data: RefCell::new(Vec::new()),
Expand All @@ -511,11 +509,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
total: Cell::new(0),
current: Cell::new(0),
start: Instant::now(),
updated_at: Cell::new(now),
timeout,
timed_out: Cell::new(None),
next_speed_check: Cell::new(now),
next_speed_check_bytes_threshold: Cell::new(0),
retry: Retry::new(self.set.config)?,
};
self.enqueue(dl, handle)?;
Expand Down Expand Up @@ -638,10 +632,8 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// active downloads to make sure they don't fire because of a slowly
// extracted tarball.
let finish_dur = start.elapsed();
for (dl, _) in self.pending.values_mut() {
dl.updated_at.set(dl.updated_at.get() + finish_dur);
dl.next_speed_check.set(dl.next_speed_check.get() + finish_dur);
}
self.updated_at.set(self.updated_at.get() + finish_dur);
self.next_speed_check.set(self.next_speed_check.get() + finish_dur);

let slot = &self.set.packages[&dl.id];
assert!(slot.fill(pkg).is_ok());
Expand All @@ -652,12 +644,12 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
let mut handle = self.set.multi.add(handle)?;
let now = Instant::now();
handle.set_token(dl.token)?;
self.updated_at.set(now);
self.next_speed_check.set(now + self.timeout.dur);
self.next_speed_check_bytes_threshold.set(self.timeout.low_speed_limit as u64);
dl.timed_out.set(None);
dl.updated_at.set(now);
dl.current.set(0);
dl.total.set(0);
dl.next_speed_check.set(now + dl.timeout.dur);
dl.next_speed_check_bytes_threshold.set(dl.timeout.low_speed_limit as u64);
self.pending.insert(dl.token, (dl, handle));
Ok(())
}
Expand Down Expand Up @@ -712,25 +704,31 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
dl.total.set(total);
let now = Instant::now();
if cur != dl.current.get() {
let delta = cur - dl.current.get();
let threshold = self.next_speed_check_bytes_threshold.get();

dl.current.set(cur);
dl.updated_at.set(now);
self.updated_at.set(now);

if dl.current.get() >= dl.next_speed_check_bytes_threshold.get() {
dl.next_speed_check.set(now + dl.timeout.dur);
dl.next_speed_check_bytes_threshold.set(
dl.current.get() + dl.timeout.low_speed_limit as u64,
if delta >= threshold {
self.next_speed_check.set(now + self.timeout.dur);
self.next_speed_check_bytes_threshold.set(
self.timeout.low_speed_limit as u64,
);
} else {
self.next_speed_check_bytes_threshold.set(threshold - delta);
}
}
if !self.tick(WhyTick::DownloadUpdate).is_ok() {
return false
}

// If we've spent too long not actually receiving any data we time out.
if now - dl.updated_at.get() > dl.timeout.dur {
if now - self.updated_at.get() > self.timeout.dur {
self.updated_at.set(now);
let msg = format!("failed to download any data for `{}` within {}s",
dl.id,
dl.timeout.dur.as_secs());
self.timeout.dur.as_secs());
dl.timed_out.set(Some(msg));
return false
}
Expand All @@ -739,13 +737,14 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// limit, see if we've transferred enough data during this threshold. If
// it fails this check then we fail because the download is going too
// slowly.
if now >= dl.next_speed_check.get() {
assert!(dl.current.get() < dl.next_speed_check_bytes_threshold.get());
if now >= self.next_speed_check.get() {
self.next_speed_check.set(now + self.timeout.dur);
assert!(self.next_speed_check_bytes_threshold.get() > 0);
let msg = format!("download of `{}` failed to transfer more \
than {} bytes in {}s",
dl.id,
dl.timeout.low_speed_limit,
dl.timeout.dur.as_secs());
self.timeout.low_speed_limit,
self.timeout.dur.as_secs());
dl.timed_out.set(Some(msg));
return false
}
Expand Down

0 comments on commit 4e1e3f7

Please sign in to comment.