Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{core/,swarm/}: Dial with handler and return handler on error and closed #2191

Merged
merged 43 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a332591
core/: Return handler on connection error and closed
mxinden Aug 12, 2021
46904a6
swarm/: Inject handler on connection error and closed
mxinden Aug 12, 2021
d756be1
swarm/src/behaviour: Provide handler with Dial and DialAddr
mxinden Aug 12, 2021
595623f
Merge branch 'libp2p/master' into handler
mxinden Aug 18, 2021
705842f
swarm/src/behaviour: Add default trait para on NetworkBehaviourAction
mxinden Aug 18, 2021
d744b4a
core/src/connection/manager: Fully close a task on disconnect
mxinden Aug 18, 2021
5c2aef6
core/: Remove DisconnectedPeer::set_connected and Pool::add
mxinden Aug 18, 2021
ce0d278
core/src/connection: Report ConnectionLimit through task
mxinden Aug 18, 2021
425e777
core/: Emit Event::Failed on aborted pending connection
mxinden Aug 19, 2021
5ff6397
core/tests: Adjust to type changes
mxinden Aug 19, 2021
7d9285f
core/CHANGELOG: Add entry for ConnectionLimit change
mxinden Aug 19, 2021
682f6be
protocols/*: Update
mxinden Aug 19, 2021
9262c03
Merge branch 'libp2p/master' into handler
mxinden Aug 19, 2021
62c5e13
protocols/*: Update
mxinden Aug 19, 2021
174693a
swarm-derive: Adjust to changes
mxinden Aug 20, 2021
a78de13
core/: Fix ConectionClose and PendingAborted reporting
mxinden Aug 20, 2021
d4960b7
*: Format with rustfmt
mxinden Aug 20, 2021
b73139a
core/src/connection: Remove outdated doc comment
mxinden Aug 20, 2021
aa02e5f
swarm/src/toggle: Fix TODOs
mxinden Aug 20, 2021
2c9f0d3
protocols/: Remove unused imports
mxinden Aug 23, 2021
6d7c73a
Merge branch 'libp2p/master' into handler
mxinden Aug 23, 2021
a56980e
core/src/network/event: Use NoZeroU32
mxinden Aug 24, 2021
1c3ed2e
swarm/src/protocols_handler: Rename to into_protocols_handler
mxinden Aug 24, 2021
32fc84e
swarm/src/behaviour: Introduce NetworkBehaviour::inject_listen_failure
mxinden Aug 24, 2021
7ea4908
swarm/src/lib: Inject handler on DialPeerCondition false
mxinden Aug 24, 2021
fbd4681
core/src/connection: Assume manager to always close handler
mxinden Aug 25, 2021
6787e77
swarm-derive: Add comments
mxinden Aug 25, 2021
b864133
swarm: Add documentation
mxinden Aug 25, 2021
b2bf380
*: Format with rustfmt
mxinden Aug 25, 2021
7d342b6
swar/src/behaviour: Link to NotifyHandler not SendEvent
mxinden Aug 25, 2021
5853890
*: Update changelogs
mxinden Aug 25, 2021
4d0faf9
swarm-derive: Fix typo
mxinden Aug 25, 2021
7a45e7b
Apply suggestions from code review
mxinden Aug 26, 2021
cef949c
core/src/network: Revert map_err
mxinden Aug 26, 2021
ceb77e5
core/src/network: Use custom method on DialAttemptsRemaining
mxinden Aug 26, 2021
814ff4b
swarm: Add doc example for carrying state in handler
mxinden Aug 26, 2021
90df72a
Merge branch 'libp2p/master' into handler
mxinden Aug 26, 2021
60c7261
swarm/src/lib: Remove use_handler_to_carry_state
mxinden Aug 30, 2021
a2f1819
core/tests/network_dial_error: Use get_attempts
mxinden Aug 30, 2021
b905545
swarm/src/behaviour.rs: Use heading for doc example
mxinden Aug 30, 2021
99f81d0
core/tests: Format with rustfmt
mxinden Aug 30, 2021
1c5eb8e
Merge branch 'master' into handler
mxinden Aug 30, 2021
c09198e
protocols/gossipsub/src/behaviour.rs: Remove unnecesary assignment
mxinden Aug 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,12 @@ where
self.handler.inject_event(event);
}

// TODO: Update comment
//
/// Begins an orderly shutdown of the connection, returning a
/// `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> Close<TMuxer> {
self.muxing.close().0
pub fn close(self) -> (THandler, Close<TMuxer>) {
(self.handler, self.muxing.close().0)
}

/// Polls the connection for events produced by the associated handler
Expand Down
4 changes: 3 additions & 1 deletion core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub enum Event<'a, H: IntoConnectionHandler, TE> {
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<THandlerError<H>>>,
handler: H::Handler,
},

/// A connection has been established.
Expand Down Expand Up @@ -384,14 +385,15 @@ impl<H: IntoConnectionHandler, TE> Manager<H, TE> {
new_endpoint: new,
}
}
task::Event::Closed { id, error } => {
task::Event::Closed { id, error, handler } => {
let id = ConnectionId(id);
let task = task.remove();
match task.state {
TaskState::Established(connected) => Event::ConnectionClosed {
id,
connected,
error,
handler,
},
TaskState::Pending => unreachable!(
"`Event::Closed` implies (2) occurred on that task and thus (3)."
Expand Down
26 changes: 21 additions & 5 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub enum Event<H: IntoConnectionHandler, TE> {
Closed {
id: TaskId,
error: Option<ConnectionError<THandlerError<H>>>,
handler: H::Handler,
},
}

Expand Down Expand Up @@ -177,7 +178,7 @@ where
},

/// The connection is closing (active close).
Closing(Close<M>),
Closing { closing_muxer: Close<M>, handler: H::Handler },

/// The task is terminating with a final event for the `Manager`.
Terminating(Event<H, E>),
Expand Down Expand Up @@ -265,7 +266,11 @@ where
// Don't accept any further commands.
this.commands.get_mut().close();
// Discard the event, if any, and start a graceful close.
this.state = State::Closing(connection.close());
let (handler, closing_muxer) = connection.close();
this.state = State::Closing {
handler,
closing_muxer,
};
continue 'poll;
}
Poll::Ready(None) => {
Expand Down Expand Up @@ -324,36 +329,47 @@ where
Poll::Ready(Err(error)) => {
// Don't accept any further commands.
this.commands.get_mut().close();
// TODO: Good idea if there is already an error?
let (handler, _) = connection.close();
// Terminate the task with the error, dropping the connection.
let event = Event::Closed {
id,
error: Some(error),
handler,
};
this.state = State::Terminating(event);
}
}
}
}

State::Closing(mut closing) => {
State::Closing {
handler,
mut closing_muxer,
} => {
// Try to gracefully close the connection.
match closing.poll_unpin(cx) {
match closing_muxer.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
let event = Event::Closed {
id: this.id,
error: None,
handler,
};
this.state = State::Terminating(event);
}
Poll::Ready(Err(e)) => {
let event = Event::Closed {
id: this.id,
error: Some(ConnectionError::IO(e)),
handler,
};
this.state = State::Terminating(event);
}
Poll::Pending => {
this.state = State::Closing(closing);
this.state = State::Closing {
handler,
closing_muxer,
};
return Poll::Pending;
}
}
Expand Down
52 changes: 32 additions & 20 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pub enum PoolEvent<'a, THandler: IntoConnectionHandler, TTransErr> {
pool: &'a mut Pool<THandler, TTransErr>,
/// The remaining number of established connections to the same peer.
num_established: u32,
handler: THandler::Handler,
},

/// A connection attempt failed.
Expand All @@ -114,7 +115,7 @@ pub enum PoolEvent<'a, THandler: IntoConnectionHandler, TTransErr> {
error: PendingConnectionError<TTransErr>,
/// The handler that was supposed to handle the connection,
/// if the connection failed before the handler was consumed.
handler: Option<THandler>,
handler: THandler,
/// The (expected) peer of the failed connection.
peer: Option<PeerId>,
/// A reference to the pool that managed the connection.
Expand Down Expand Up @@ -554,6 +555,7 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
num_established,
error: None,
pool: self,
handler: todo!(),
});
}

Expand All @@ -572,7 +574,7 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
id,
endpoint,
error,
handler: Some(handler),
handler: handler,
mxinden marked this conversation as resolved.
Show resolved Hide resolved
peer,
pool: self,
});
Expand All @@ -582,6 +584,7 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
id,
connected,
error,
handler,
} => {
let num_established =
if let Some(conns) = self.established.get_mut(&connected.peer_id) {
Expand All @@ -601,6 +604,7 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
error,
num_established,
pool: self,
handler,
});
}
manager::Event::ConnectionEstablished { entry } => {
Expand All @@ -610,30 +614,38 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {

// Check general established connection limit.
if let Err(e) = self.counters.check_max_established(&endpoint) {
let connected = entry.remove();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
error: PendingConnectionError::ConnectionLimit(e),
handler: None,
peer,
pool: self,
});
// TODO: Good idea? How should we let the user know that the close
// happened due to a conneciton limit?
entry.start_close();
// let connected = entry.remove();
// return Poll::Ready(PoolEvent::PendingConnectionError {
// id,
// endpoint: connected.endpoint,
// error: PendingConnectionError::ConnectionLimit(e),
// handler: None,
// peer,
// pool: self,
// });
continue;
}

// Check per-peer established connection limit.
let current =
num_peer_established(&self.established, &entry.connected().peer_id);
if let Err(e) = self.counters.check_max_established_per_peer(current) {
let connected = entry.remove();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
error: PendingConnectionError::ConnectionLimit(e),
handler: None,
peer,
pool: self,
});
// TODO: Good idea? How should we let the user know that the close
// happened due to a conneciton limit?
entry.start_close();
// let connected = entry.remove();
// return Poll::Ready(PoolEvent::PendingConnectionError {
// id,
// endpoint: connected.endpoint,
// error: PendingConnectionError::ConnectionLimit(e),
// handler: None,
// peer,
// pool: self,
// });
continue;
}

// Peer ID checks must already have happened. See `add_pending`.
Expand Down
37 changes: 19 additions & 18 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod event;
pub mod peer;

pub use crate::connection::{ConnectionCounters, ConnectionLimits};
pub use event::{IncomingConnection, NetworkEvent};
pub use event::{IncomingConnection, NetworkEvent, DialAttemptsRemaining};
pub use peer::Peer;

use crate::{
Expand Down Expand Up @@ -438,19 +438,22 @@ where
log::warn!("Dialing aborted: {:?}", e);
}
}
// TODO: Include handler in event.
event
}
Poll::Ready(PoolEvent::ConnectionClosed {
id,
connected,
error,
num_established,
handler,
..
}) => NetworkEvent::ConnectionClosed {
id,
connected,
num_established,
error,
handler,
},
Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => {
NetworkEvent::ConnectionEvent { connection, event }
Expand Down Expand Up @@ -563,7 +566,7 @@ fn on_connection_failed<'a, TTrans, THandler>(
id: ConnectionId,
endpoint: ConnectedPoint,
error: PendingConnectionError<TTrans::Error>,
handler: Option<THandler>,
handler: THandler,
) -> (
Option<DialingOpts<PeerId, THandler>>,
NetworkEvent<'a, TTrans, THandlerInEvent<THandler>, THandlerOutEvent<THandler>, THandler>,
Expand Down Expand Up @@ -592,27 +595,21 @@ where
let failed_addr = attempt.current.1.clone();

let (opts, attempts_remaining) = if num_remain > 0 {
if let Some(handler) = handler {
let next_attempt = attempt.remaining.remove(0);
let opts = DialingOpts {
peer: peer_id,
handler,
address: next_attempt,
remaining: attempt.remaining,
};
(Some(opts), num_remain)
} else {
// The error is "fatal" for the dialing attempt, since
// the handler was already consumed. All potential
// remaining connection attempts are thus void.
(None, 0)
}
let next_attempt = attempt.remaining.remove(0);
let opts = DialingOpts {
peer: peer_id,
handler,
address: next_attempt,
remaining: attempt.remaining,
};
(Some(opts), DialAttemptsRemaining::Some(num_remain))
} else {
(None, 0)
(None, DialAttemptsRemaining::None(handler))
};

(
opts,
// TODO: This is the place to return the handler.
NetworkEvent::DialError {
attempts_remaining,
peer_id,
Expand All @@ -625,20 +622,24 @@ where
match endpoint {
ConnectedPoint::Dialer { address } => (
None,
// TODO: This is the place to return the handler.
NetworkEvent::UnknownPeerDialError {
multiaddr: address,
error,
handler,
},
),
ConnectedPoint::Listener {
local_addr,
send_back_addr,
} => (
None,
// TODO: This is the place to return the handler.
NetworkEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
handler,
},
),
}
Expand Down
Loading