Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a deadlock between the crawler and dialer, and other hangs #1950

Merged
merged 7 commits into from
Apr 7, 2021
96 changes: 56 additions & 40 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,13 @@ where
/// An action that the peer crawler can take.
#[allow(dead_code)]
enum CrawlerAction {
/// Handle a demand signal.
Demand,
/// Drop the demand signal because there are too many pending handshakes.
DemandDrop,
/// Initiate a handshake to `candidate` in response to demand.
DemandHandshake { candidate: MetaAddr },
/// Crawl existing peers for more peers in response to demand, because there
/// are no available candidates.
DemandCrawl,
/// Crawl existing peers for more peers in response to a timer `tick`.
TimerCrawl { tick: Instant },
/// Handle a successfully connected handshake `peer_set_change`.
Expand All @@ -297,7 +302,7 @@ enum CrawlerAction {
async fn crawl_and_dial<C, S>(
crawl_new_peer_interval: std::time::Duration,
mut demand_tx: mpsc::Sender<()>,
demand_rx: mpsc::Receiver<()>,
mut demand_rx: mpsc::Receiver<()>,
mut candidates: CandidateSet<S>,
mut connector: C,
mut success_tx: mpsc::Sender<PeerChange>,
Expand Down Expand Up @@ -328,7 +333,6 @@ where

let mut crawl_timer =
tokio::time::interval(crawl_new_peer_interval).map(|tick| TimerCrawl { tick });
let mut demand_rx = demand_rx.map(|()| Demand);

loop {
metrics::gauge!(
Expand All @@ -341,53 +345,65 @@ where

let crawler_action = tokio::select! {
a = handshakes.next() => a.expect("handshakes never terminates, because it contains a future that never resolves"),
a = crawl_timer.next() => a.expect("crawl_timer never terminates"),
a = demand_rx.next() => a.expect("demand_rx never fails, because we hold demand_tx"),
a = crawl_timer.next() => a.expect("timers never terminate"),
// turn the demand into an action, based on the crawler's current state
_ = demand_rx.next() => {
if handshakes.len() > 50 {
// Too many pending handshakes already
DemandDrop
} else if let Some(candidate) = candidates.next().await {
// candidates.next has a short delay, and briefly holds the address
// book lock, so it shouldn't hang
DemandHandshake { candidate }
} else {
DemandCrawl
}
}
};

match crawler_action {
Demand => {
if handshakes.len() > 50 {
// This is set to trace level because when the peerset is
// congested it can generate a lot of demand signal very
// rapidly.
trace!("too many in-flight handshakes, dropping demand signal");
continue;
}
DemandDrop => {
// This is set to trace level because when the peerset is
// congested it can generate a lot of demand signal very
// rapidly.
trace!("too many in-flight handshakes, dropping demand signal");
continue;
}
DemandHandshake { candidate } => {
// TODO: spawn independent task to avoid deadlocks
// candidates has a short delay, and briefly holds the address
// book lock, so it shouldn't hang
if let Some(candidate) = candidates.next().await {
debug!(?candidate.addr, "attempting outbound connection in response to demand");
// the connector is always ready, so this can't hang
connector.ready_and().await?;
// the handshake has timeouts, so it shouldn't hang
handshakes.push(Box::pin(connector.call(candidate.addr).map(
move |res| match res {
Ok(peer_set_change) => HandshakeConnected { peer_set_change },
Err(e) => {
debug!(?candidate.addr, ?e, "failed to connect to candidate");
HandshakeFailed {
failed_addr: candidate,
}
debug!(?candidate.addr, "attempting outbound connection in response to demand");
// the connector is always ready, so this can't hang
connector.ready_and().await?;
// the handshake has timeouts, so it shouldn't hang
handshakes.push(Box::pin(connector.call(candidate.addr).map(
move |res| match res {
Ok(peer_set_change) => HandshakeConnected { peer_set_change },
Err(e) => {
debug!(?candidate.addr, ?e, "failed to connect to candidate");
HandshakeFailed {
failed_addr: candidate,
}
},
)));
} else {
debug!("demand for peers but no available candidates");
// update has timeouts, and briefly holds the address book
// lock, so it shouldn't hang
candidates.update().await?;
// Try to connect to a new peer.
let _ = demand_tx.try_send(());
}
}
},
)));
}
DemandCrawl => {
debug!("demand for peers but no available candidates");
// update has timeouts, and briefly holds the address book
// lock, so it shouldn't hang
//
// TODO: refactor candidates into a buffered service, so we can
// spawn independent tasks to avoid deadlocks
candidates.update().await?;
// Try to connect to a new peer.
let _ = demand_tx.try_send(());
}
TimerCrawl { tick } => {
debug!(
?tick,
"crawling for more peers in response to the crawl timer"
);
// TODO: spawn independent task to avoid deadlocks
// TODO: spawn independent tasks to avoid deadlocks
candidates.update().await?;
// Try to connect to a new peer.
let _ = demand_tx.try_send(());
Expand Down