Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
bug: add timeouts for reqwest calls (#355)
Browse files Browse the repository at this point in the history
* bug: add timeouts for reqwest calls

This uses new `mozilla-services/cloud-storage-rs *::*_with()` calls to
pass a `reqwest::Client` object with timeouts.

Closes: #354
  • Loading branch information
jrconlin authored Feb 16, 2022
1 parent 80d7dca commit ac91d92
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 113 deletions.
305 changes: 226 additions & 79 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ actix-cors = "0.5"
actix-http = "2"
actix-rt = "1" # 2+ breaks testing, May need actix-web 4+?
actix-web = "3"
actix-web-location = { version = "0.3", features = ["maxmind", "cadence"] }
actix-web-location = { version = "0.5", features = ["actix-web-v3", "maxmind", "cadence"] }
async-trait = "0.1"
backtrace = "0.3"
base64 = "0.13"
Expand All @@ -35,7 +35,7 @@ futures = "0.3"
gethostname = "0.2.1"
hex = "0.4"
hostname = "0.3"
image = "0.23"
image = "0.24"
lazy_static = "1.4"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_info"] }
rand ="0.8"
Expand Down
34 changes: 21 additions & 13 deletions src/adm/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
fmt::Debug,
iter::FromIterator,
sync::{Arc, RwLock},
time::Duration,
};

use actix_http::http::Uri;
Expand Down Expand Up @@ -58,7 +59,9 @@ pub struct AdmFilter {
pub source: String,
pub source_url: Option<url::Url>,
pub last_updated: Option<chrono::DateTime<chrono::Utc>>,
pub refresh_rate: std::time::Duration,
pub refresh_rate: Duration,
pub connect_timeout: Duration,
pub request_timeout: Duration,
}

/// Parse &str into a `Url`
Expand Down Expand Up @@ -101,7 +104,7 @@ fn check_url(url: Url, species: &'static str, filter: &[Vec<String>]) -> Handler
Err(HandlerErrorKind::UnexpectedHost(species, host).into())
}

pub fn spawn_updater(filter: &Arc<RwLock<AdmFilter>>) {
pub fn spawn_updater(filter: &Arc<RwLock<AdmFilter>>, req: reqwest::Client) {
if !filter.read().unwrap().is_cloud() {
return;
}
Expand All @@ -110,7 +113,7 @@ pub fn spawn_updater(filter: &Arc<RwLock<AdmFilter>>) {
let tags = crate::tags::Tags::default();
loop {
let mut filter = mfilter.write().unwrap();
match filter.requires_update().await {
match filter.requires_update(&req).await {
Ok(true) => filter.update().await.unwrap_or_else(|e| {
filter.report(&e, &tags);
}),
Expand Down Expand Up @@ -144,7 +147,7 @@ impl AdmFilter {
}

/// check to see if the bucket has been modified since the last time we updated.
pub async fn requires_update(&self) -> HandlerResult<bool> {
pub async fn requires_update(&self, req: &reqwest::Client) -> HandlerResult<bool> {
// don't update non-bucket versions (for now)
if !self.is_cloud() {
return Ok(false);
Expand All @@ -157,7 +160,8 @@ impl AdmFilter {
})?
.to_string();
let obj =
cloud_storage::Object::read(&host, bucket.path().trim_start_matches('/')).await?;
cloud_storage::Object::read_with(&host, bucket.path().trim_start_matches('/'), req)
.await?;
if let Some(updated) = self.last_updated {
// if the bucket is older than when we last checked, do nothing.
return Ok(updated <= obj.updated);
Expand All @@ -170,14 +174,18 @@ impl AdmFilter {
/// Try to update the ADM filter data from the remote bucket.
pub async fn update(&mut self) -> HandlerResult<()> {
if let Some(bucket) = &self.source_url {
let adm_settings = AdmFilterSettings::from_settings_bucket(bucket)
.await
.map_err(|e| {
HandlerError::internal(&format!(
"Invalid bucket data in {:?}: {:?}",
self.source, e
))
})?;
let adm_settings = AdmFilterSettings::from_settings_bucket(
bucket,
self.connect_timeout,
self.request_timeout,
)
.await
.map_err(|e| {
HandlerError::internal(&format!(
"Invalid bucket data in {:?}: {:?}",
self.source, e
))
})?;
for (adv, setting) in adm_settings.advertisers {
if setting.delete {
trace!("Removing advertiser {:?}", &adv);
Expand Down
13 changes: 12 additions & 1 deletion src/adm/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ impl AdmFilterSettings {
/// Try to fetch the ADM settings from a Google Storage bucket url.
pub async fn from_settings_bucket(
settings_bucket: &url::Url,
connection_timeout: std::time::Duration,
request_timeout: std::time::Duration,
) -> Result<AdmFilterSettings, ConfigError> {
let settings_str = settings_bucket.as_str();
if settings_bucket.scheme() != "gs" {
Expand All @@ -307,7 +309,12 @@ impl AdmFilterSettings {
})?
.to_string();
let path = settings_bucket.path().trim_start_matches('/');
let contents = cloud_storage::Object::download(&bucket_name, path)
let req = reqwest::Client::builder()
.connect_timeout(connection_timeout)
.timeout(request_timeout)
.build()
.map_err(|e| ConfigError::Message(e.to_string()))?;
let contents = cloud_storage::Object::download_with(&bucket_name, path, &req)
.await
.map_err(|e| ConfigError::Message(format!("Could not download settings: {:?}", e)))?;
let mut reply =
Expand Down Expand Up @@ -401,6 +408,8 @@ impl From<&mut Settings> for HandlerResult<AdmFilter> {
.to_lowercase();
let mut all_include_regions = HashSet::new();
let source = settings.adm_settings.clone();
let connect_timeout = settings.connect_timeout;
let request_timeout = settings.request_timeout;
let source_url = match source.parse::<url::Url>() {
Ok(v) => Some(v),
Err(e) => {
Expand Down Expand Up @@ -438,6 +447,8 @@ impl From<&mut Settings> for HandlerResult<AdmFilter> {
source,
source_url,
refresh_rate: std::time::Duration::from_secs(refresh_rate),
connect_timeout: std::time::Duration::from_secs(connect_timeout),
request_timeout: std::time::Duration::from_secs(request_timeout),
})
}
}
Expand Down
38 changes: 23 additions & 15 deletions src/server/img_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub struct StorageSettings {
metrics: ImageMetricSettings,
/// Max request time (in seconds)
request_timeout: u64,
/// Max connection timeout (in seconds)
connection_timeout: u64,
}

/// Instantiate from [Settings]
Expand Down Expand Up @@ -93,13 +95,17 @@ impl Default for StorageSettings {
cache_ttl: 86400 * 15,
metrics: ImageMetricSettings::default(),
request_timeout: 3,
connection_timeout: 3,
}
}
}

/// Image storage container
#[derive(Clone)]
pub struct StoreImage {
// No `Default` stated for `StoreImage` because we *ALWAYS* want a timeout
// for the `reqwest::Client`
//
// bucket isn't really needed here, since `Object` stores and manages itself,
// but it may prove useful in future contexts.
//
Expand All @@ -108,15 +114,6 @@ pub struct StoreImage {
req: reqwest::Client,
}

impl Default for StoreImage {
fn default() -> Self {
Self {
settings: StorageSettings::default(),
req: reqwest::Client::new(),
}
}
}

/// Stored image information, suitable for determining the URL to present to the CDN
#[derive(Debug)]
pub struct StoredImage {
Expand Down Expand Up @@ -173,8 +170,7 @@ impl StoreImage {
// to public view.
//

// verify that the bucket can be read
let _content = Bucket::read(&settings.bucket_name)
let _content = Bucket::read_with(&settings.bucket_name, client)
.await
.map_err(|e| HandlerError::internal(&format!("Could not read bucket {:?}", e)))?;

Expand Down Expand Up @@ -344,7 +340,8 @@ impl StoreImage {

// check to see if image has already been stored.
if let Ok(exists) =
cloud_storage::Object::read(&self.settings.bucket_name, &image_path).await
cloud_storage::Object::read_with(&self.settings.bucket_name, &image_path, &self.req)
.await
{
trace!("Found existing image in bucket: {:?}", &exists.media_link);
return Ok(StoredImage {
Expand All @@ -361,6 +358,7 @@ impl StoreImage {
&image_path,
content_type,
Some(&[("ifGenerationMatch", "0")]),
Some(self.req.clone()),
)
.await
{
Expand Down Expand Up @@ -476,7 +474,10 @@ mod tests {
let src_img = "https://evilonastick.com/test/128px.jpg";

let test_settings = test_storage_settings();
let client = reqwest::Client::new();
let client = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(test_settings.request_timeout))
.build()
.unwrap();
let bucket = StoreImage::check_bucket(&test_settings, &client)
.await
.unwrap()
Expand All @@ -492,9 +493,13 @@ mod tests {
let test_valid_image = test_image_buffer(96, 96);
let test_uri: Uri = "https://example.com/test.jpg".parse().unwrap();
let test_settings = test_storage_settings();
let timeout = Duration::from_secs(test_settings.request_timeout);
let bucket = StoreImage {
settings: test_settings,
req: reqwest::Client::new(),
req: reqwest::Client::builder()
.connect_timeout(timeout)
.build()
.unwrap(),
};

let result = bucket
Expand All @@ -514,7 +519,10 @@ mod tests {
let test_uri: Uri = "https://example.com/test.jpg".parse().unwrap();
let bucket = StoreImage {
settings: test_storage_settings(),
req: reqwest::Client::new(),
req: reqwest::Client::builder()
.connect_timeout(Duration::from_secs(3))
.build()
.unwrap(),
};

assert!(bucket
Expand Down
5 changes: 3 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,13 @@ impl Server {
raw_filter.update().await?
}
let filter = Arc::new(RwLock::new(raw_filter));
spawn_updater(&filter);
let tiles_cache = cache::TilesCache::new(TILES_CACHE_INITIAL_CAPACITY);
let req = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(settings.connect_timeout))
.timeout(Duration::from_secs(settings.request_timeout))
.user_agent(REQWEST_USER_AGENT)
.build()?;
spawn_updater(&filter, req.clone());
let tiles_cache = cache::TilesCache::new(TILES_CACHE_INITIAL_CAPACITY);
let img_store = StoreImage::create(&settings, &req).await?;
let excluded_dmas = if let Some(exclude_dmas) = &settings.exclude_dma {
serde_json::from_str(exclude_dmas).map_err(|e| {
Expand Down
3 changes: 3 additions & 0 deletions src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub struct Settings {
pub exclude_dma: Option<String>,
/// Timeout (in seconds) for only the connect phase of all outbound HTTP requests
pub connect_timeout: u64,
/// default total request timeout (in seconds)
pub request_timeout: u64,
/// Whether excluded countries recieve empty tile responses via an HTTP 200
/// status code or 204s when disabled. See
/// https://github.com/mozilla-services/contile/issues/284
Expand Down Expand Up @@ -153,6 +155,7 @@ impl Default for Settings {
// exclude for: Glendive, MT(798); Alpena, MI(583); North Platte, NE (740)
exclude_dma: Some("[798, 583, 740]".to_owned()),
connect_timeout: 2,
request_timeout: 5,
excluded_countries_200: true,
// ADM specific settings
adm_endpoint_url: "".to_owned(),
Expand Down
6 changes: 5 additions & 1 deletion src/web/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use actix_cors::Cors;
use actix_web::{
Expand Down Expand Up @@ -58,7 +59,10 @@ macro_rules! init_app {
};
let state = ServerState {
metrics: Box::new(metrics.clone()),
reqwest_client: reqwest::Client::new(),
reqwest_client: reqwest::Client::builder()
.connect_timeout(Duration::from_secs(3))
.build()
.unwrap(),
tiles_cache: cache::TilesCache::new(10),
settings: $settings.clone(),
filter: Arc::new(RwLock::new(
Expand Down

0 comments on commit ac91d92

Please sign in to comment.