Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/: Remove DisconnectedPeer::set_connected and Pool::add #2195

Merged
merged 4 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
- Require `ConnectionHandler::{InEvent,OutEvent,Error}` to implement `Debug`
(see [PR 2183]).

- Remove `DisconnectedPeer::set_connected` and `Pool::add` (see [PR 2195]).


[PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145
[PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142
[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
[PR 2195]: https://github.com/libp2p/rust-libp2p/pull/2195

# 0.29.0 [2021-07-12]

Expand Down
38 changes: 2 additions & 36 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

use super::{
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Connected, ConnectedPoint, Connection, ConnectionError, ConnectionHandler,
IntoConnectionHandler, PendingConnectionError, Substream,
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, IntoConnectionHandler,
PendingConnectionError, Substream,
};
use crate::{muxing::StreamMuxer, Executor};
use fnv::FnvHashMap;
Expand Down Expand Up @@ -276,40 +276,6 @@ impl<H: IntoConnectionHandler, TE> Manager<H, TE> {
ConnectionId(task_id)
}

/// Adds an existing connection to the manager.
pub fn add<M>(&mut self, conn: Connection<M, H::Handler>, info: Connected) -> ConnectionId
where
H: IntoConnectionHandler + Send + 'static,
H::Handler: ConnectionHandler<Substream = Substream<M>> + Send + 'static,
<H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TE: error::Error + Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;

let (tx, rx) = mpsc::channel(self.task_command_buffer_size);
self.tasks.insert(
task_id,
TaskInfo {
sender: tx,
state: TaskState::Established(info),
},
);

let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _>>> =
Box::pin(Task::established(task_id, self.events_tx.clone(), rx, conn));

if let Some(executor) = &mut self.executor {
executor.exec(task);
} else {
self.local_spawns.push(task);
}

ConnectionId(task_id)
}

/// Gets an entry for a managed connection, if it exists.
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, THandlerInEvent<H>>> {
if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
Expand Down
18 changes: 0 additions & 18 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,24 +130,6 @@ where
},
}
}

/// Create a task for an existing node we are already connected to.
pub fn established(
id: TaskId,
events: mpsc::Sender<Event<H, E>>,
commands: mpsc::Receiver<Command<THandlerInEvent<H>>>,
connection: Connection<M, H::Handler>,
) -> Self {
Task {
id,
events,
commands: commands.fuse(),
state: State::Established {
connection,
event: None,
},
}
}
}

/// The state associated with the `Task` of a connection.
Expand Down
36 changes: 2 additions & 34 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@

use crate::{
connection::{
self,
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
manager::{self, Manager, ManagerConfig},
Connected, Connection, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit,
IncomingInfo, IntoConnectionHandler, OutgoingInfo, PendingConnectionError, Substream,
Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, IncomingInfo,
IntoConnectionHandler, OutgoingInfo, PendingConnectionError, Substream,
},
muxing::StreamMuxer,
ConnectedPoint, PeerId,
Expand Down Expand Up @@ -313,37 +312,6 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
id
}

/// Adds an existing established connection to the pool.
///
/// Returns the assigned connection ID on success. An error is returned
/// if the configured maximum number of established connections for the
/// connected peer has been reached.
pub fn add<TMuxer>(
&mut self,
c: Connection<TMuxer, THandler::Handler>,
i: Connected,
) -> Result<ConnectionId, ConnectionLimit>
where
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler:
ConnectionHandler<Substream = connection::Substream<TMuxer>> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
self.counters.check_max_established(&i.endpoint)?;
self.counters
.check_max_established_per_peer(self.num_peer_established(&i.peer_id))?;
let id = self.manager.add(c, i.clone());
self.counters.inc_established(&i.endpoint);
self.established
.entry(i.peer_id)
.or_default()
.insert(id, i.endpoint);
Ok(id)
}

/// Gets an entry representing a connection in the pool.
///
/// Returns `None` if the pool has no connection with the given ID.
Expand Down
44 changes: 3 additions & 41 deletions core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
use super::{DialError, DialingOpts, Network};
use crate::{
connection::{
handler::THandlerInEvent, pool::Pool, Connected, ConnectedPoint, Connection,
ConnectionHandler, ConnectionId, ConnectionLimit, EstablishedConnection,
EstablishedConnectionIter, IntoConnectionHandler, PendingConnection, Substream,
handler::THandlerInEvent, pool::Pool, ConnectedPoint, ConnectionHandler, ConnectionId,
ConnectionLimit, EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler,
PendingConnection, Substream,
},
Multiaddr, PeerId, StreamMuxer, Transport,
};
Expand Down Expand Up @@ -472,44 +472,6 @@ where
pub fn into_peer(self) -> Peer<'a, TTrans, THandler> {
Peer::Disconnected(self)
}

/// Moves the peer into a connected state by supplying an existing
/// established connection.
///
/// No event is generated for this action.
///
/// # Panics
///
/// Panics if `connected.peer_id` does not identify the current peer.
pub fn set_connected<TMuxer>(
self,
connected: Connected,
connection: Connection<TMuxer, THandler::Handler>,
) -> Result<ConnectedPeer<'a, TTrans, THandler>, ConnectionLimit>
where
THandler: Send + 'static,
TTrans::Error: Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
{
if connected.peer_id != self.peer_id {
panic!(
"Invalid peer ID given: {:?}. Expected: {:?}",
connected.peer_id, self.peer_id
)
}

self.network
.pool
.add(connection, connected)
.map(move |_id| ConnectedPeer {
network: self.network,
peer_id: self.peer_id,
})
}
}

/// The (internal) state of a `DialingAttempt`, tracking the
Expand Down