Skip to content

Commit

Permalink
Identify: start moving I/O from NetworkBehaviour,
Browse files Browse the repository at this point in the history
instead of responding pending replies from NetworkBehaviour, send them
back to ConnectionHandler.

ConnectionHandler for now just receives them, it's implementation of the
responding will come next.
  • Loading branch information
jxs committed Dec 7, 2022
1 parent cff7c4a commit d703ba6
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 81 deletions.
97 changes: 27 additions & 70 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::handler::{self, Proto, Push};
use crate::handler::{self, InEvent, Proto, Reply};
use crate::protocol::{Info, ReplySubstream, UpgradeError};
use futures::prelude::*;
use libp2p_core::{
connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey,
};
Expand All @@ -35,7 +34,6 @@ use std::num::NonZeroUsize;
use std::{
collections::{HashMap, HashSet, VecDeque},
iter::FromIterator,
pin::Pin,
task::Context,
task::Poll,
time::Duration,
Expand All @@ -51,8 +49,8 @@ pub struct Behaviour {
config: Config,
/// For each peer we're connected to, the observed address to send back to it.
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
/// Pending replies to send.
pending_replies: VecDeque<Reply>,
/// Pending requests to respond.
requests: VecDeque<Request>,
/// Pending events to be emitted when polled.
events: VecDeque<NetworkBehaviourAction<Event, Proto>>,
/// Peers to which an active push with current information about
Expand All @@ -63,18 +61,10 @@ pub struct Behaviour {
}

/// A pending reply to an inbound identification request.
enum Reply {
/// The reply is queued for sending.
Queued {
peer: PeerId,
io: ReplySubstream<NegotiatedSubstream>,
observed: Multiaddr,
},
/// The reply is being sent.
Sending {
peer: PeerId,
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>,
},
struct Request {
peer: PeerId,
io: ReplySubstream<NegotiatedSubstream>,
observed: Multiaddr,
}

/// Configuration for the [`identify::Behaviour`](Behaviour).
Expand Down Expand Up @@ -184,7 +174,7 @@ impl Behaviour {
Self {
config,
connected: HashMap::new(),
pending_replies: VecDeque::new(),
requests: VecDeque::new(),
events: VecDeque::new(),
pending_push: HashSet::new(),
discovered_peers,
Expand Down Expand Up @@ -287,7 +277,7 @@ impl NetworkBehaviour for Behaviour {
with an established connection and calling `NetworkBehaviour::on_event` \
with `FromSwarm::ConnectionEstablished ensures there is an entry; qed",
);
self.pending_replies.push_back(Reply::Queued {
self.requests.push_back(Request {
peer: peer_id,
io: sender,
observed: observed.clone(),
Expand All @@ -305,7 +295,7 @@ impl NetworkBehaviour for Behaviour {

fn poll(
&mut self,
cx: &mut Context<'_>,
_cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(event) = self.events.pop_front() {
Expand Down Expand Up @@ -333,7 +323,7 @@ impl NetworkBehaviour for Behaviour {
observed_addr,
};

(*peer, Push(info))
(*peer, InEvent::Push(info))
})
});

Expand All @@ -346,55 +336,21 @@ impl NetworkBehaviour for Behaviour {
});
}

// Check for pending replies to send.
if let Some(r) = self.pending_replies.pop_front() {
let mut sending = 0;
let to_send = self.pending_replies.len() + 1;
let mut reply = Some(r);
loop {
match reply {
Some(Reply::Queued { peer, io, observed }) => {
let info = Info {
listen_addrs: listen_addrs(params),
protocols: supported_protocols(params),
public_key: self.config.local_public_key.clone(),
protocol_version: self.config.protocol_version.clone(),
agent_version: self.config.agent_version.clone(),
observed_addr: observed,
};
let io = Box::pin(io.send(info));
reply = Some(Reply::Sending { peer, io });
}
Some(Reply::Sending { peer, mut io }) => {
sending += 1;
match Future::poll(Pin::new(&mut io), cx) {
Poll::Ready(Ok(())) => {
let event = Event::Sent { peer_id: peer };
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending => {
self.pending_replies.push_back(Reply::Sending { peer, io });
if sending == to_send {
// All remaining futures are NotReady
break;
} else {
reply = self.pending_replies.pop_front();
}
}
Poll::Ready(Err(err)) => {
let event = Event::Error {
peer_id: peer,
error: ConnectionHandlerUpgrErr::Upgrade(
libp2p_core::upgrade::UpgradeError::Apply(err),
),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
}
}
None => unreachable!(),
}
}
// Check for pending requests to send back to the handler for reply.
if let Some(Request { peer, io, observed }) = self.requests.pop_front() {
let info = Info {
listen_addrs: listen_addrs(params),
protocols: supported_protocols(params),
public_key: self.config.local_public_key.clone(),
protocol_version: self.config.protocol_version.clone(),
agent_version: self.config.agent_version.clone(),
observed_addr: observed,
};
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id: peer,
handler: NotifyHandler::Any,
event: InEvent::Identify(Reply { peer, info, io }),
});
}

Poll::Pending
Expand Down Expand Up @@ -557,6 +513,7 @@ impl PeerCache {
mod tests {
use super::*;
use futures::pin_mut;
use futures::prelude::*;
use libp2p::mplex::MplexConfig;
use libp2p::noise;
use libp2p::tcp;
Expand Down
42 changes: 31 additions & 11 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ impl IntoConnectionHandler for Proto {
}
}

/// A reply to an inbound identification request.
#[derive(Debug)]
pub struct Reply {
pub peer: PeerId,
pub io: ReplySubstream<NegotiatedSubstream>,
pub info: Info,
}

/// Protocol handler for sending and receiving identification requests.
///
/// Outbound requests are sent periodically. The handler performs expects
Expand Down Expand Up @@ -106,9 +114,14 @@ pub enum Event {
IdentificationError(ConnectionHandlerUpgrErr<UpgradeError>),
}

/// Identifying information of the local node that is pushed to a remote.
#[derive(Debug)]
pub struct Push(pub Info);
#[allow(clippy::large_enum_variant)]
pub enum InEvent {
/// Identifying information of the local node that is pushed to a remote.
Push(Info),
/// Identifying information requested from this node.
Identify(Reply),
}

impl Handler {
/// Creates a new `Handler`.
Expand Down Expand Up @@ -195,7 +208,7 @@ impl Handler {
}

impl ConnectionHandler for Handler {
type InEvent = Push;
type InEvent = InEvent;
type OutEvent = Event;
type Error = io::Error;
type InboundProtocol = SelectUpgrade<Protocol, PushProtocol<InboundPush>>;
Expand All @@ -207,14 +220,21 @@ impl ConnectionHandler for Handler {
SubstreamProtocol::new(SelectUpgrade::new(Protocol, PushProtocol::inbound()), ())
}

fn on_behaviour_event(&mut self, Push(push): Self::InEvent) {
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
EitherUpgrade::B(PushProtocol::outbound(push)),
(),
),
});
fn on_behaviour_event(&mut self, event: Self::InEvent) {
match event {
InEvent::Push(push) => {
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
EitherUpgrade::B(PushProtocol::outbound(push)),
(),
),
});
}
InEvent::Identify(_) => {
todo!()
}
}
}

fn connection_keep_alive(&self) -> KeepAlive {
Expand Down

0 comments on commit d703ba6

Please sign in to comment.