Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Commit

Permalink
Merge #271
Browse files Browse the repository at this point in the history
271: Adjust subscription locking r=ljedrz a=koivunej

Fixes the number of critical sections in SubscriptionFuture from 2 to 1, helping out with the hangs in exchange_block which have been recorded in #134. Also backports the `related_subs` cleanup from #264.

Co-authored-by: Joonas Koivunen <joonas@equilibrium.co>
  • Loading branch information
bors[bot] and Joonas Koivunen committed Jul 30, 2020
2 parents 39feb8e + 7b71137 commit c55298b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 27 deletions.
94 changes: 68 additions & 26 deletions src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl<TRes: Debug + Clone + PartialEq> SubscriptionRegistry<TRes> {
id,
kind,
subscriptions: Arc::clone(&self.subscriptions),
cleanup_complete: false,
}
}

Expand Down Expand Up @@ -284,37 +285,66 @@ pub struct SubscriptionFuture<TRes: Debug + PartialEq> {
kind: RequestKind,
/// A reference to the subscriptions at the `SubscriptionRegistry`.
subscriptions: Arc<Mutex<Subscriptions<TRes>>>,
/// True if the cleanup is already done, false if `Drop` needs to do it
cleanup_complete: bool,
}

impl<TRes: Debug + PartialEq> Future for SubscriptionFuture<TRes> {
type Output = Result<TRes, Cancelled>;

fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
let mut subscription = {
// don't hold the lock for too long, otherwise the `Drop` impl for `SubscriptionFuture`
// can cause a stack overflow
let mut subscriptions = task::block_on(async { self.subscriptions.lock().await });
if let Some(sub) = subscriptions
.get_mut(&self.kind)
.and_then(|subs| subs.remove(&self.id))
{
sub
} else {
// the subscription must already have been cancelled
return Poll::Ready(Err(Cancelled));
}
};

match subscription {
Subscription::Cancelled => Poll::Ready(Err(Cancelled)),
Subscription::Pending { ref mut waker, .. } => {
*waker = Some(context.waker().clone());
task::block_on(async { self.subscriptions.lock().await })
.get_mut(&self.kind)
.and_then(|subs| subs.insert(self.id, subscription));
Poll::Pending
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
use std::collections::hash_map::Entry::*;

// FIXME: using task::block_on ever is quite unfortunate. alternatives which have been
// discussed:
//
// - going back to std::sync::Mutex
// - using a state machine
//
// std::sync::Mutex might be ok here as long as we don't really need to await after
// acquiring. implementing the state machine manually might not be possible as all mutexes
// lock futures seem to need a borrow, however using async fn does not allow implementing
// Drop.
let mut subscriptions = task::block_on(async { self.subscriptions.lock().await });

if let Some(related_subs) = subscriptions.get_mut(&self.kind) {
let (became_empty, ret) = match related_subs.entry(self.id) {
// there were no related subs, it can only mean cancellation or polling after
// Poll::Ready
Vacant(_) => return Poll::Ready(Err(Cancelled)),
Occupied(mut oe) => {
let unwrapped = match oe.get_mut() {
Subscription::Pending { ref mut waker, .. } => {
// waker may have changed since the last time
*waker = Some(context.waker().clone());
return Poll::Pending;
}
Subscription::Cancelled => {
oe.remove();
Err(Cancelled)
}
_ => match oe.remove() {
Subscription::Ready(result) => Ok(result),
_ => unreachable!("already matched"),
},
};

(related_subs.is_empty(), unwrapped)
}
};

if became_empty {
// early cleanup if possible for cancelled and ready. the pending variant has the
// chance of sending out the cancellation message so it cannot be treated here
subscriptions.remove(&self.kind);
}
Subscription::Ready(result) => Poll::Ready(Ok(result)),

// need to drop manually to aid borrowck at least in 1.45
drop(subscriptions);
self.cleanup_complete = became_empty;
Poll::Ready(ret)
} else {
Poll::Ready(Err(Cancelled))
}
}
}
Expand All @@ -323,6 +353,11 @@ impl<TRes: Debug + PartialEq> Drop for SubscriptionFuture<TRes> {
fn drop(&mut self) {
debug!("Dropping subscription {} to {}", self.id, self.kind);

if self.cleanup_complete {
// cleaned up the easier variants already
return;
}

let (sub, is_last) = task::block_on(async {
let mut subscriptions = self.subscriptions.lock().await;
let related_subs = if let Some(subs) = subscriptions.get_mut(&self.kind) {
Expand All @@ -334,13 +369,20 @@ impl<TRes: Debug + PartialEq> Drop for SubscriptionFuture<TRes> {
// check if this is the last subscription to this resource
let is_last = related_subs.is_empty();

if is_last {
subscriptions.remove(&self.kind);
}

(sub, is_last)
});

if let Some(sub) = sub {
// don't bother updating anything that isn't `Pending`
if let mut sub @ Subscription::Pending { .. } = sub {
debug!("It was the last related subscription, sending a cancel notification");
debug!(
"Last related subscription dropped, sending a cancel notification for {} to {}",
self.id, self.kind
);
sub.cancel(self.kind.clone(), is_last);
}
}
Expand Down
3 changes: 2 additions & 1 deletion tests/wantlist_and_cancellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ async fn check_cid_subscriptions(ipfs: &Node, cid: &Cid, expected_count: usize)
assert_eq!(subs.len(), 1);
}
let subscription_count = subs.get(&cid.clone().into()).map(|l| l.len());
assert_eq!(subscription_count, Some(expected_count));
// treat None as 0
assert_eq!(subscription_count.unwrap_or(0), expected_count);
}

/// Check if canceling a Cid affects the wantlist.
Expand Down

0 comments on commit c55298b

Please sign in to comment.