Skip to content

Commit

Permalink
Fix deadlock by using scc instead of dashmap
Browse files Browse the repository at this point in the history
Fixes elan-ev#1129

The "frozen Tobira" bug was caused by the following: we had two
in-memory hashmaps that cached different things. Each entry in that
cache has a time stamp and is evicted/ignored once it is too old. The
eviction is done by a long running task, that calls `retain` on the
cache. Of course, the cache is also read. In the case of the user cache
these steps are regularly performed:
1. Does an entry for this username exists? If yes:
2. Is this entry too old or differs from the new data? If yes:
3. Write new data to database.
4. Update the timestamp in the cache entry to mark it as "brand new".

This tricky part is this: step 3 contains an `.await` point (waiting for
the DB to reply). At that point, the Tokio runtime will pause that task
and run a different task on that same thread. If that different task
happens to be the maintenance task that calls `retain` on the hashmap,
then we are in trouble: the documentation of `retain` says:

> May deadlock if called when holding any sort of reference into the map

And in [this issue](xacrimon/dashmap#233) it
is specified, that this means:

> May deadlock if called when **the current thread is** holding any sort
> of reference into the map.

That is usually not a problem in non-async multi-threaded code, but in
async code when a lock/reference is held across an await point, then it
can happen that the same thread holds a lock and tries to call `retain`.

The solution is to not do that: avoid holding locks across await points
(a good advice generally anyway). However, that isn't trivial in our
case, but much more importantly: doing that is a subtle mistake! I don't
want bugs like this cropping up accidentally again. I'm sure there is
a clippy lint for that, but still, it feels wrong to me. So I decided
to use a different library that does not have this problem. It's not yet
as widely used but it seems very promising.

The auth callback cache could also use `HashIndex`, which might be
faster. But for simplicity I kept both `HashMaps` now. Should be plenty
fast.
  • Loading branch information
LukasKalbertodt committed Mar 12, 2024
1 parent 7401998 commit 812c489
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 42 deletions.
21 changes: 7 additions & 14 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ chrono = { version = "0.4", default-features = false, features = ["serde", "std"
clap = { version = "4.2.2", features = ["derive", "string"] }
confique = { version = "0.2.0", default-features = false, features = ["toml"] }
cookie = "0.18.0"
dashmap = "5.5.3"
deadpool = { version = "0.10.0", default-features = false, features = ["managed", "rt_tokio_1"] }
deadpool-postgres = { version = "0.12.1", default-features = false, features = ["rt_tokio_1"] }
elliptic-curve = { version = "0.13.4", features = ["jwk", "sec1"] }
Expand Down Expand Up @@ -65,6 +64,7 @@ ring = "0.17.8"
rustls = "0.22.2"
rustls-native-certs = "0.7.0"
rustls-pemfile = "2.1.0"
scc = "2.0.17"
secrecy = { version = "0.8", features = ["serde"] }
serde = { version = "1.0.192", features = ["derive"] }
serde_json = "1"
Expand Down
67 changes: 42 additions & 25 deletions backend/src/auth/cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{collections::HashSet, hash::Hash, time::{Instant, Duration}};

use dashmap::{DashMap, mapref::entry::Entry};
use deadpool_postgres::Client;
use hyper::HeaderMap;
use prometheus_client::metrics::counter::Counter;
use scc::{hash_map::Entry, HashMap};

use crate::{config::Config, prelude::*};

Expand All @@ -24,14 +24,14 @@ impl Caches {

/// Starts a daemon that regularly removes outdated entries from the cache.
pub(crate) async fn maintainence_task(&self, config: &Config) -> ! {
fn cleanup<K: Eq + Hash, V>(
async fn cleanup<K: Eq + Hash, V>(
now: Instant,
map: &DashMap<K, V>,
map: &HashMap<K, V>,
cache_duration: Duration,
mut timestamp: impl FnMut(&V) -> Instant,
) -> Option<Instant> {
let mut out = None;
map.retain(|_, v| {
map.retain_async(|_, v| {
let instant = timestamp(v);
let is_outdated = now.saturating_duration_since(instant) > cache_duration;
if !is_outdated {
Expand All @@ -41,7 +41,7 @@ impl Caches {
};
}
!is_outdated
});
}).await;
out.map(|out| out + cache_duration)
}

Expand All @@ -50,10 +50,18 @@ impl Caches {

loop {
let now = Instant::now();
let next_user_action =
cleanup(now, &self.user.0, CACHE_DURATION, |v| v.last_written_to_db);
let next_callback_action =
cleanup(now, &self.callback.map, config.auth.callback.cache_duration, |v| v.timestamp);
let next_user_action = cleanup(
now,
&self.user.0,
CACHE_DURATION,
|v| v.last_written_to_db,
).await;
let next_callback_action = cleanup(
now,
&self.callback.map,
config.auth.callback.cache_duration,
|v| v.timestamp,
).await;

// We will wait until the next entry in the hashmap gets stale, but
// at least 30s to not do cleanup too often. In case there are no
Expand All @@ -77,6 +85,7 @@ impl Caches {

const CACHE_DURATION: Duration = Duration::from_secs(60 * 10);

#[derive(Clone)]
struct UserCacheEntry {
display_name: String,
email: Option<String>,
Expand All @@ -92,15 +101,15 @@ struct UserCacheEntry {
/// This works fine in multi-node setups: each node just has its local cache and
/// prevents some DB writes. But as this data is never used otherwise, we don't
/// run into data inconsistency problems.
pub(crate) struct UserCache(DashMap<String, UserCacheEntry>);
pub(crate) struct UserCache(HashMap<String, UserCacheEntry>);

impl UserCache {
fn new() -> Self {
Self(DashMap::new())
Self(HashMap::new())
}

pub(crate) async fn upsert_user_info(&self, user: &super::User, db: &Client) {
match self.0.entry(user.username.clone()) {
match self.0.entry_async(user.username.clone()).await {
Entry::Occupied(mut occupied) => {
let entry = occupied.get();
let needs_update = entry.last_written_to_db.elapsed() > CACHE_DURATION
Expand All @@ -119,7 +128,7 @@ impl UserCache {
Entry::Vacant(vacant) => {
let res = Self::write_to_db(user, db).await;
if res.is_ok() {
vacant.insert(UserCacheEntry {
vacant.insert_entry(UserCacheEntry {
display_name: user.display_name.clone(),
email: user.email.clone(),
roles: user.roles.clone(),
Expand Down Expand Up @@ -167,7 +176,7 @@ impl UserCache {

// ---------------------------------------------------------------------------

#[derive(PartialEq, Eq)]
#[derive(PartialEq, Eq, Clone)]
struct AuthCallbackCacheKey(HeaderMap);

impl Hash for AuthCallbackCacheKey {
Expand All @@ -191,6 +200,7 @@ impl Hash for AuthCallbackCacheKey {
}
}

#[derive(Clone)]
struct AuthCallbackCacheEntry {
user: Option<User>,
timestamp: Instant,
Expand All @@ -199,7 +209,7 @@ struct AuthCallbackCacheEntry {

/// Cache for `auth-callback` calls.
pub(crate) struct AuthCallbackCache {
map: DashMap<AuthCallbackCacheKey, AuthCallbackCacheEntry>,
map: HashMap<AuthCallbackCacheKey, AuthCallbackCacheEntry>,
// Metrics
hits: Counter,
misses: Counter,
Expand All @@ -208,7 +218,7 @@ pub(crate) struct AuthCallbackCache {
impl AuthCallbackCache {
fn new() -> Self {
Self {
map: DashMap::new(),
map: HashMap::new(),
hits: Counter::default(),
misses: Counter::default(),
}
Expand All @@ -225,13 +235,18 @@ impl AuthCallbackCache {
self.map.len()
}

pub(super) fn get(&self, key: &HeaderMap, config: &CallbackConfig) -> Option<Option<User>> {
pub(super) async fn get(
&self,
key: &HeaderMap,
config: &CallbackConfig,
) -> Option<Option<User>> {
// TODO: this `clone` should not be necessary. It can be removed with
// `#[repr(transparent)]` and an `unsafe`, but I don't want to just
// throw around `unsafe` here.
let out = self.map.get(&AuthCallbackCacheKey(key.clone()))
.filter(|e| e.timestamp.elapsed() < config.cache_duration)
.map(|e| e.user.clone());
let out = self.map.get_async(&AuthCallbackCacheKey(key.clone()))
.await
.filter(|e| e.get().timestamp.elapsed() < config.cache_duration)
.map(|e| e.get().user.clone());

match out.is_some() {
true => self.hits.inc(),
Expand All @@ -241,11 +256,13 @@ impl AuthCallbackCache {
out
}

pub(super) fn insert(&self, key: HeaderMap, user: Option<User>) {
self.map.insert(AuthCallbackCacheKey(key), AuthCallbackCacheEntry {
user,
timestamp: Instant::now(),
});
pub(super) async fn insert(&self, key: HeaderMap, user: Option<User>) {
self.map.entry_async(AuthCallbackCacheKey(key))
.await
.insert_entry(AuthCallbackCacheEntry {
user,
timestamp: Instant::now(),
});
}
}

4 changes: 2 additions & 2 deletions backend/src/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl User {
if !ctx.config.auth.callback.cache_duration.is_zero() {
header_copy = Some(req.headers().clone());
let callback_config = &ctx.config.auth.callback;
if let Some(user) = ctx.auth_caches.callback.get(req.headers(), callback_config) {
if let Some(user) = ctx.auth_caches.callback.get(req.headers(), &callback_config).await {
return Ok(user);
}
}
Expand All @@ -265,7 +265,7 @@ impl User {

// Insert into cache
if !ctx.config.auth.callback.cache_duration.is_zero() {
ctx.auth_caches.callback.insert(header_copy.unwrap(), out.clone());
ctx.auth_caches.callback.insert(header_copy.unwrap(), out.clone()).await;
}

Ok(out)
Expand Down

0 comments on commit 812c489

Please sign in to comment.