Skip to content

Commit

Permalink
Refactor download function and add backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
harrisonbarlow committed Aug 19, 2022
1 parent bb95496 commit 2ae181e
Showing 1 changed file with 81 additions and 66 deletions.
147 changes: 81 additions & 66 deletions src/asvo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ use std::time::Instant;

use log::{debug, info};
use reqwest::blocking::{Client, ClientBuilder};
use backoff::{retry, ExponentialBackoff, Error};
use sha1::{Digest, Sha1};
use tar::Archive;

use crate::obsid::Obsid;

use self::types::AsvoFilesArray;

/// The address of the MWA ASVO.
const ASVO_ADDRESS: &str = "https://asvo.mwatelescope.org:443";

Expand Down Expand Up @@ -146,13 +149,6 @@ impl AsvoClient {

/// Private function to actually do the work.
fn download(&self, job: &AsvoJob, keep_tar: bool, hash: bool) -> Result<(), AsvoError> {
// How big should our in-memory download buffer be [MiB]?
let buffer_size = match var("GIANT_SQUID_BUF_SIZE") {
Ok(s) => s.parse()?,
Err(_) => 100, // 100 MiB by default.
} * 1024
* 1024;

// Is the job ready to download?
if job.state != AsvoJobState::Ready {
return Err(AsvoError::NotReady {
Expand Down Expand Up @@ -191,66 +187,11 @@ impl AsvoClient {
Some(url) => {
debug!("Downloading file {:?}", &url);

// parse out path from url
let url_obj = reqwest::Url::parse(url).unwrap();
let out_path = url_obj.path_segments().unwrap().last().unwrap();

let response = self.client.get(url).send()?;
let mut tee = tee_readwrite::TeeReader::new(response, Sha1::new(), false);

if keep_tar {
// Simply dump the response to the appropriate file name. Use a
// buffer to avoid doing frequent writes.

let mut out_file = File::create(out_path)?;
let mut file_buf = BufReader::with_capacity(buffer_size, tee.by_ref());

loop {
let buffer = file_buf.fill_buf()?;
out_file.write_all(buffer)?;
let op = || {
self.try_download(url, keep_tar, hash, f, job).map_err(Error::transient)
};

let length = buffer.len();
file_buf.consume(length);
if length == 0 {
break;
}
}
} else {
// Stream-untar the response.
debug!("Attempting to untar stream");
let mut tar = Archive::new(&mut tee);
tar.unpack(".")?;
}

// If we were told to hash the download, compare our hash against
// the upstream hash. Stream untarring may not read all of the
// bytes; read the tee to the end.
{
let mut final_bytes = vec![];
tee.read_to_end(&mut final_bytes)?;
}

if hash {
match &f.sha1 {
Some(sha) => {
debug!("Upstream hash: {}", sha);
let (_, hasher) = tee.into_inner();
let hash = format!("{:x}", hasher.finalize());
debug!("Our hash: {}", &hash);
if !hash.eq_ignore_ascii_case(sha) {
return Err(AsvoError::HashMismatch {
jobid: job.jobid,
file: url.to_string(),
calculated_hash: hash,
expected_hash: sha.to_string(),
});
}
}
_ => {
panic!("Product does not include a hash to compare.")
}
}
}
let _ = retry(ExponentialBackoff::default(), op);

info!(
"Completed download in {} (average rate: {}/s)",
Expand Down Expand Up @@ -304,6 +245,80 @@ impl AsvoClient {
Ok(())
}

pub fn try_download(&self, url: &str, keep_tar: bool, hash: bool, f: &AsvoFilesArray, job: &AsvoJob) -> Result<(), AsvoError> {
// How big should our in-memory download buffer be [MiB]?
let buffer_size = match var("GIANT_SQUID_BUF_SIZE") {
Ok(s) => s.parse()?,
Err(_) => 100, // 100 MiB by default.
} * 1024
* 1024;

// parse out path from url
let url_obj = reqwest::Url::parse(url).unwrap();
let out_path = url_obj.path_segments().unwrap().last().unwrap();

let response = self.client.get(url).send()?;

let mut tee = tee_readwrite::TeeReader::new(response, Sha1::new(), false);

if keep_tar {
// Simply dump the response to the appropriate file name. Use a
// buffer to avoid doing frequent writes.

let mut out_file = File::create(out_path)?;
let mut file_buf = BufReader::with_capacity(buffer_size, tee.by_ref());

loop {
let buffer = file_buf.fill_buf()?;
out_file.write_all(buffer)?;

let length = buffer.len();
file_buf.consume(length);
if length == 0 {
break;
}
}
} else {
// Stream-untar the response.
debug!("Attempting to untar stream");
let mut tar = Archive::new(&mut tee);

tar.unpack(".")?;
}

// If we were told to hash the download, compare our hash against
// the upstream hash. Stream untarring may not read all of the
// bytes; read the tee to the end.
{
let mut final_bytes = vec![];
tee.read_to_end(&mut final_bytes)?;
}

if hash {
match &f.sha1 {
Some(sha) => {
debug!("Upstream hash: {}", sha);
let (_, hasher) = tee.into_inner();
let hash = format!("{:x}", hasher.finalize());
debug!("Our hash: {}", &hash);
if !hash.eq_ignore_ascii_case(sha) {
return Err(AsvoError::HashMismatch {
jobid: job.jobid,
file: url.to_string(),
calculated_hash: hash,
expected_hash: sha.to_string(),
});
}
}
_ => {
panic!("Product does not include a hash to compare.")
}
}
}

Ok(())
}

/// Submit an ASVO job for visibility download.
pub fn submit_vis(
&self,
Expand Down

0 comments on commit 2ae181e

Please sign in to comment.