Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
feat: move cleanup of temp cache states into a ScopeGuard (#351)
Browse files Browse the repository at this point in the history
and kill TilesCache's Deref to dashmap, since we now only expose get and insert
outside of the cache module

Closes #342

Co-authored-by: JR Conlin <jr+git@mozilla.com>
  • Loading branch information
pjenvey and jrconlin authored Feb 4, 2022
1 parent fe2c325 commit 9122ec0
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 90 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ serde = "1.0"
sentry = "0.19"
sentry-backtrace = "0.19"
serde_json = "1.0"
scopeguard = "1.1.0"
slog = { version = "2.7", features = ["max_level_trace", "release_max_level_info", "dynamic-keys"] }
slog-async = "2.6"
slog-envlogger = "2.2.0"
Expand Down
2 changes: 1 addition & 1 deletion src/adm/tiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub async fn get_tiles(
tags: &mut Tags,
metrics: &Metrics,
headers: Option<&HeaderMap>,
) -> Result<TileResponse, HandlerError> {
) -> HandlerResult<TileResponse> {
let settings = &state.settings;
let image_store = &state.img_store;
let adm_url = Url::parse_with_params(
Expand Down
89 changes: 83 additions & 6 deletions src/server/cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Tile cache manager
use std::{
fmt::Debug,
ops::Deref,
sync::Arc,
time::{Duration, SystemTime},
};
Expand Down Expand Up @@ -56,13 +55,91 @@ impl TilesCache {
}
});
}

/// Get an immutable reference to an entry in the cache
pub fn get(
&self,
audience_key: &AudienceKey,
) -> Option<dashmap::mapref::one::Ref<'_, AudienceKey, TilesState>> {
self.inner.get(audience_key)
}

/// Prepare to write to the cache.
///
/// Sets the cache entry to the Refreshing/Populating states.
/// `WriteHandle` resets those states when it goes out of scope if no
/// `insert` call was issued (due to errors or panics).
pub fn prepare_write<'a>(
&'a self,
audience_key: &'a AudienceKey,
expired: bool,
) -> WriteHandle<'a, impl FnOnce(()) + '_> {
if expired {
// The cache entry's expired and we're about to refresh it
trace!("prepare_write: Fresh now expired, Refreshing");
self.inner
.alter(audience_key, |_, tiles_state| match tiles_state {
TilesState::Fresh { tiles } if tiles.expired() => {
TilesState::Refreshing { tiles }
}
_ => tiles_state,
});
} else {
// We'll populate this cache entry for probably the first time
trace!("prepare_write: Populating");
self.inner
.insert(audience_key.clone(), TilesState::Populating);
};

let guard = scopeguard::guard((), move |_| {
trace!("prepare_write (ScopeGuard cleanup): Resetting state");
if expired {
// Back to Fresh (though the tiles are expired): so a later request
// will retry refreshing again
self.inner
.alter(audience_key, |_, tiles_state| match tiles_state {
TilesState::Refreshing { tiles } => TilesState::Fresh { tiles },
_ => tiles_state,
});
} else {
// Clear the entry: a later request will retry populating again
self.inner.remove_if(audience_key, |_, tiles_state| {
matches!(tiles_state, TilesState::Populating)
});
}
});
WriteHandle {
cache: self,
audience_key,
guard,
}
}
}

impl Deref for TilesCache {
type Target = Arc<DashMap<AudienceKey, TilesState>>;
/// Manages a write to a specific `TilesCache` entry.
///
/// This will reset the temporary state set by `prepare_write` when it's gone
/// out of scope and no `insert` was issued (e.g. in the case of errors or
/// panics).
pub struct WriteHandle<'a, F>
where
F: FnOnce(()),
{
cache: &'a TilesCache,
audience_key: &'a AudienceKey,
guard: scopeguard::ScopeGuard<(), F>,
}

fn deref(&self) -> &Self::Target {
&self.inner
impl<F> WriteHandle<'_, F>
where
F: FnOnce(()),
{
/// Insert a value into the cache for our audience_key
pub fn insert(self, tiles: TilesState) {
self.cache.inner.insert(self.audience_key.clone(), tiles);
// With the write completed cancel scopeguard's cleanup
scopeguard::ScopeGuard::into_inner(self.guard);
trace!("WriteHandle: ScopeGuard defused (cancelled)");
}
}

Expand Down Expand Up @@ -140,7 +217,7 @@ async fn tiles_cache_garbage_collect(cache: &TilesCache, metrics: &Metrics) {
// calculate the size and GC (for seldomly used Tiles) while we're at it
let mut cache_count = 0;
let mut cache_size = 0;
for refm in cache.iter() {
for refm in cache.inner.iter() {
cache_count += 1;
cache_size += refm.value().size();
}
Expand Down
125 changes: 42 additions & 83 deletions src/web/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use rand::{thread_rng, Rng};

use crate::{
adm,
error::{HandlerError, HandlerErrorKind},
error::{HandlerErrorKind, HandlerResult},
metrics::Metrics,
server::{
cache::{self, Tiles, TilesState},
Expand Down Expand Up @@ -43,7 +43,7 @@ pub async fn get_tiles(
metrics: Metrics,
state: web::Data<ServerState>,
request: HttpRequest,
) -> Result<HttpResponse, HandlerError> {
) -> HandlerResult<HttpResponse> {
trace!("get_tiles");
metrics.incr("tiles.get");

Expand Down Expand Up @@ -127,22 +127,13 @@ pub async fn get_tiles(

// Alter the cache separately from the read above: writes are more
// expensive and these alterations occur infrequently
if expired {
// The cache entry's expired and we're about to refresh it
trace!("get_tiles: Fresh now expired, Refreshing");
state
.tiles_cache
.alter(&audience_key, |_, tiles_state| match tiles_state {
TilesState::Fresh { tiles } if tiles.expired() => TilesState::Refreshing { tiles },
_ => tiles_state,
});
} else {
// We'll populate this cache entry for probably the first time
trace!("get_tiles: Populating");
state
.tiles_cache
.insert(audience_key.clone(), TilesState::Populating);
};

// Prepare to write: temporarily set the cache entry to
// Refreshing/Populating until we've completed our write, notifying other
// requests in flight during this time to return stale data/204 No Content
// instead of making duplicate/redundant writes. The handle will reset the
// temporary state if no write occurs (due to errors/panics)
let handle = state.tiles_cache.prepare_write(&audience_key, expired);

let result = adm::get_tiles(
&state,
Expand All @@ -159,75 +150,43 @@ pub async fn get_tiles(
)
.await;

let handle_result = || {
match result {
Ok(response) => {
let tiles = cache::Tiles::new(response, add_jitter(&state.settings))?;
trace!(
"get_tiles: cache miss{}: {:?}",
if expired { " (expired)" } else { "" },
&audience_key
);
metrics.incr("tiles_cache.miss");
state.tiles_cache.insert(
audience_key.clone(),
TilesState::Fresh {
tiles: tiles.clone(),
},
);
Ok(content_response(&tiles.content))
}
Err(e) => {
// Add some kind of stats to Retrieving or RetrievingFirst?
// do we need a kill switch if we're restricting like this already?
match e.kind() {
HandlerErrorKind::BadAdmResponse(es) => {
warn!("Bad response from ADM: {:?}", e);
metrics.incr_with_tags("tiles.invalid", Some(&tags));
state.tiles_cache.insert(
audience_key.clone(),
TilesState::Fresh {
tiles: Tiles::empty(add_jitter(&state.settings)),
},
);
// Report directly to sentry
// (This is starting to become a pattern. 🤔)
let mut tags = Tags::from_head(request.head(), settings);
tags.add_extra("err", es);
tags.add_tag("level", "warning");
l_sentry::report(&tags, sentry::event_from_error(&e));
warn!("ADM Server error: {:?}", e);
Ok(HttpResponse::NoContent().finish())
}
_ => Err(e),
match result {
Ok(response) => {
let tiles = cache::Tiles::new(response, add_jitter(&state.settings))?;
trace!(
"get_tiles: cache miss{}: {:?}",
if expired { " (expired)" } else { "" },
&audience_key
);
metrics.incr("tiles_cache.miss");
handle.insert(TilesState::Fresh {
tiles: tiles.clone(),
});
Ok(content_response(&tiles.content))
}
Err(e) => {
// Add some kind of stats to Retrieving or RetrievingFirst?
// do we need a kill switch if we're restricting like this already?
match e.kind() {
HandlerErrorKind::BadAdmResponse(es) => {
warn!("Bad response from ADM: {:?}", e);
metrics.incr_with_tags("tiles.invalid", Some(&tags));
handle.insert(TilesState::Fresh {
tiles: Tiles::empty(add_jitter(&state.settings)),
});
// Report directly to sentry
// (This is starting to become a pattern. 🤔)
let mut tags = Tags::from_head(request.head(), settings);
tags.add_extra("err", es);
tags.add_tag("level", "warning");
l_sentry::report(&tags, sentry::event_from_error(&e));
warn!("ADM Server error: {:?}", e);
Ok(HttpResponse::NoContent().finish())
}
_ => Err(e),
}
}
};

let result = handle_result();
// Cleanup the TilesState on errors
// TODO: potential panics are not currently cleaned up
if result.is_err() {
if expired {
// Back to Fresh (though the tiles are expired): so a later request
// will retry refreshing again
state
.tiles_cache
.alter(&audience_key, |_, tiles_state| match tiles_state {
TilesState::Refreshing { tiles } => TilesState::Fresh { tiles },
_ => tiles_state,
});
} else {
// Clear the entry: a later request will retry populating again
state
.tiles_cache
.remove_if(&audience_key, |_, tiles_state| {
matches!(tiles_state, TilesState::Populating)
});
}
}
result
}

fn content_response(content: &cache::TilesContent) -> HttpResponse {
Expand Down

0 comments on commit 9122ec0

Please sign in to comment.