Skip to content

Commit

Permalink
swarm/src/lib: Prioritize Behaviour over Pool and Pool over Listeners
Browse files Browse the repository at this point in the history
Have the main event loop (`Swarm::poll_next_event`) prioritize:

1. Work on `NetworkBehaviour` over work on `Pool`, thus prioritizing
   local work over work coming from a remote.

2. Work on `Pool` over work on `ListenersStream`, thus prioritizing work
   on existing connections over upgrading new incoming connections.
  • Loading branch information
mxinden committed May 2, 2022
1 parent 00346de commit 426023e
Showing 1 changed file with 65 additions and 61 deletions.
126 changes: 65 additions & 61 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1018,88 +1018,92 @@ where
// across a `Deref`.
let this = &mut *self;

// This loop polls the components below in a prioritized order.
//
// 1. [`NetworkBehaviour`]
// 2. Connection [`Pool`]
// 3. [`ListenersStream`]
//
// (1) is polled before (2) to prioritize local work over work coming from a remote.
//
// (2) is polled before (3) to prioritize existing connections over upgrading new incoming connections.
loop {
let mut listeners_not_ready = false;
let mut connections_not_ready = false;
match this.pending_event.take() {
// Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the previous
// iteration to the connection handler(s).
Some((peer_id, handler, event)) => match handler {
PendingNotifyHandler::One(conn_id) => {
match this.pool.get_established(conn_id) {
Some(mut conn) => match notify_one(&mut conn, event, cx) {
None => continue,
Some(event) => {
this.pending_event = Some((peer_id, handler, event));
}
},
None => continue,
}
}
PendingNotifyHandler::Any(ids) => {
match notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) {
None => continue,
Some((event, ids)) => {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
}
}
}
},
// No pending event. Allow the [`NetworkBehaviour`] to make progress.
None => {
let behaviour_poll = {
let mut parameters = SwarmPollParameters {
local_peer_id: &this.local_peer_id,
supported_protocols: &this.supported_protocols,
listened_addrs: &this.listened_addrs,
external_addrs: &this.external_addrs,
};
this.behaviour.poll(cx, &mut parameters)
};

// Poll the listener(s) for new connections.
match ListenersStream::poll(Pin::new(&mut this.listeners), cx) {
Poll::Pending => listeners_not_ready = true,
Poll::Ready(listeners_event) => {
if let Some(swarm_event) = this.handle_listeners_event(listeners_event) {
return Poll::Ready(swarm_event);
match behaviour_poll {
Poll::Pending => {}
Poll::Ready(behaviour_event) => {
if let Some(swarm_event) = this.handle_behaviour_event(behaviour_event)
{
return Poll::Ready(swarm_event);
}

continue;
}
}
}
}

// Poll the known peers.
match this.pool.poll(cx) {
Poll::Pending => connections_not_ready = true,
Poll::Pending => {}
Poll::Ready(pool_event) => {
if let Some(swarm_event) = this.handle_pool_event(pool_event) {
return Poll::Ready(swarm_event);
}
}
};

// After the network had a chance to make progress, try to deliver
// the pending event emitted by the behaviour in the previous iteration
// to the connection handler(s). The pending event must be delivered
// before polling the behaviour again. If the targeted peer
// meanwhie disconnected, the event is discarded.
if let Some((peer_id, handler, event)) = this.pending_event.take() {
match handler {
PendingNotifyHandler::One(conn_id) => {
if let Some(mut conn) = this.pool.get_established(conn_id) {
if let Some(event) = notify_one(&mut conn, event, cx) {
this.pending_event = Some((peer_id, handler, event));
if listeners_not_ready && connections_not_ready {
return Poll::Pending;
} else {
continue;
}
}
}
}
PendingNotifyHandler::Any(ids) => {
if let Some((event, ids)) =
notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx)
{
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
if listeners_not_ready && connections_not_ready {
return Poll::Pending;
} else {
continue;
}
}
}
continue;
}
}

debug_assert!(this.pending_event.is_none());

let behaviour_poll = {
let mut parameters = SwarmPollParameters {
local_peer_id: &this.local_peer_id,
supported_protocols: &this.supported_protocols,
listened_addrs: &this.listened_addrs,
external_addrs: &this.external_addrs,
};
this.behaviour.poll(cx, &mut parameters)
};

match behaviour_poll {
Poll::Pending if listeners_not_ready && connections_not_ready => {
return Poll::Pending
}
// Poll the listener(s) for new connections.
match ListenersStream::poll(Pin::new(&mut this.listeners), cx) {
Poll::Pending => {}
Poll::Ready(behaviour_event) => {
if let Some(swarm_event) = this.handle_behaviour_event(behaviour_event) {
Poll::Ready(listeners_event) => {
if let Some(swarm_event) = this.handle_listeners_event(listeners_event) {
return Poll::Ready(swarm_event);
}

continue;
}
}

return Poll::Pending;
}
}
}
Expand Down

0 comments on commit 426023e

Please sign in to comment.