diff --git a/Cargo.lock b/Cargo.lock index f7fea0e2..6f45067a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5369,7 +5369,6 @@ dependencies = [ "libp2p-swarm", "libp2p-swarm-test", "lighthouse_network", - "parking_lot", "quick-protobuf", "serde", "serde_json", @@ -5380,6 +5379,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tracing", + "tracing-subscriber", "version", ] diff --git a/anchor/network/Cargo.toml b/anchor/network/Cargo.toml index 0450fdf9..ed2ebe98 100644 --- a/anchor/network/Cargo.toml +++ b/anchor/network/Cargo.toml @@ -26,7 +26,6 @@ libp2p = { version = "0.54", default-features = false, features = [ "request-response", ] } lighthouse_network = { workspace = true } -parking_lot = { workspace = true } quick-protobuf = "0.8.1" serde = { workspace = true } serde_json = "1.0.137" @@ -44,3 +43,4 @@ async-channel = { workspace = true } libp2p-swarm = { version = "0.45.1", features = ["macros"] } libp2p-swarm-test = { version = "0.4.0" } tokio = { workspace = true, features = ["rt", "macros", "time"] } +tracing-subscriber = { workspace = true } diff --git a/anchor/network/src/handshake/envelope/codec.rs b/anchor/network/src/handshake/codec.rs similarity index 51% rename from anchor/network/src/handshake/envelope/codec.rs rename to anchor/network/src/handshake/codec.rs index 7c7e7bf4..88f47984 100644 --- a/anchor/network/src/handshake/envelope/codec.rs +++ b/anchor/network/src/handshake/codec.rs @@ -1,11 +1,13 @@ -use crate::handshake::envelope; -use crate::handshake::envelope::{parse_envelope, Envelope}; +use crate::handshake::envelope::Envelope; +use crate::handshake::node_info::NodeInfo; +use crate::handshake::{envelope, node_info}; use async_trait::async_trait; use futures::{AsyncReadExt, AsyncWriteExt}; use libp2p::futures::{AsyncRead, AsyncWrite}; +use libp2p::identity::Keypair; use libp2p::{request_response, StreamProtocol}; use std::io; -use tracing::debug; +use tracing::trace; const MAXIMUM_SIZE: u64 = 1024; @@ -15,15 +17,53 @@ impl From for io::Error { } } +impl From for io::Error { + fn from(err: node_info::Error) -> io::Error { + io::Error::new(io::ErrorKind::InvalidData, err) + } +} + /// A `Codec` that reads/writes an **`Envelope`**. -#[derive(Clone, Debug, Default)] -pub struct Codec; +#[derive(Clone, Debug)] +pub struct Codec { + keypair: Keypair, +} + +impl Codec { + pub fn new(keypair: Keypair) -> Self { + Self { keypair } + } + + async fn read(&mut self, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut msg_buf = Vec::new(); + let num_bytes_read = io.take(MAXIMUM_SIZE).read_to_end(&mut msg_buf).await?; + trace!(?num_bytes_read, "read handshake"); + let env = Envelope::parse_and_verify(&msg_buf)?; + let node_info = NodeInfo::unmarshal(&env.payload)?; + Ok(node_info) + } + + async fn write(&mut self, io: &mut T, node_info: NodeInfo) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let envelope = node_info.seal(&self.keypair)?; + let raw = envelope.encode_to_vec()?; + io.write_all(&raw).await?; + io.flush().await?; + io.close().await?; + Ok(()) + } +} #[async_trait] impl request_response::Codec for Codec { type Protocol = StreamProtocol; - type Request = Envelope; - type Response = Envelope; + type Request = NodeInfo; + type Response = NodeInfo; async fn read_request( &mut self, @@ -33,14 +73,8 @@ impl request_response::Codec for Codec { where T: AsyncRead + Unpin + Send, { - debug!("reading handsake request"); - let mut msg_buf = Vec::new(); - let num_bytes_read = io.take(MAXIMUM_SIZE).read_to_end(&mut msg_buf).await?; - // TODO potentially try to read one more byte here and create a "message too large error" - debug!(?num_bytes_read, "read handshake request"); - let env = Envelope::decode_from_slice(&msg_buf)?; - debug!(?env, "decoded handshake request"); - Ok(env) + trace!("reading handshake request"); + self.read(io).await } async fn read_response( @@ -51,17 +85,8 @@ impl request_response::Codec for Codec { where T: AsyncRead + Unpin + Send, { - debug!("reading handshake response"); - let mut msg_buf = Vec::new(); - // We don't need a varint here because we always read only one message in the protocol. - // In this way we can just read until the end of the stream. - let num_bytes_read = io.take(MAXIMUM_SIZE).read_to_end(&mut msg_buf).await?; - debug!(?num_bytes_read, "read handshake response"); - - let env = parse_envelope(&msg_buf)?; - - debug!(?env, "decoded handshake response"); - Ok(env) + trace!("reading handshake response"); + self.read(io).await } async fn write_request( @@ -73,12 +98,8 @@ impl request_response::Codec for Codec { where T: AsyncWrite + Unpin + Send, { - debug!(req = ?req, "writing handshake request"); - let raw = req.encode_to_vec()?; - io.write_all(&raw).await?; - io.close().await?; - debug!("wrote handshake request"); - Ok(()) + trace!(req = ?req, "writing handshake request"); + self.write(io, req).await } async fn write_response( @@ -90,13 +111,7 @@ impl request_response::Codec for Codec { where T: AsyncWrite + Unpin + Send, { - debug!("writing handshake response"); - let raw = res - .encode_to_vec() - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - io.write_all(&raw).await?; - io.close().await?; - debug!("wrote handshake response"); - Ok(()) + trace!("writing handshake response"); + self.write(io, res).await } } diff --git a/anchor/network/src/handshake/envelope/generated/mod.rs b/anchor/network/src/handshake/envelope/generated/mod.rs deleted file mode 100644 index e216a501..00000000 --- a/anchor/network/src/handshake/envelope/generated/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod message; diff --git a/anchor/network/src/handshake/envelope/mod.rs b/anchor/network/src/handshake/envelope/mod.rs index 96d8a1d5..7bbaa427 100644 --- a/anchor/network/src/handshake/envelope/mod.rs +++ b/anchor/network/src/handshake/envelope/mod.rs @@ -1,8 +1,11 @@ -mod codec; -mod generated; +mod generated { + pub mod message; +} +use crate::handshake::envelope::Error::SignatureVerification; use crate::handshake::node_info::NodeInfo; use discv5::libp2p_identity::PublicKey; +pub use generated::message::pb::Envelope; use libp2p::identity::DecodingError; use quick_protobuf::{BytesReader, Error as ProtoError, MessageRead, MessageWrite, Writer}; use thiserror::Error; @@ -29,31 +32,31 @@ impl Envelope { } /// Decode an Envelope from a Protobuf byte array (like `proto.Unmarshal` in Go). - pub fn decode_from_slice(data: &[u8]) -> Result { + fn decode_from_slice(data: &[u8]) -> Result { let mut reader = BytesReader::from_bytes(data); let env = Envelope::from_reader(&mut reader, data).map_err(Error::Coding)?; Ok(env) } -} -/// Decodes an Envelope and verify signature. -pub fn parse_envelope(bytes: &[u8]) -> Result { - let env = Envelope::decode_from_slice(bytes)?; + /// Decodes an Envelope and verify signature. + pub fn parse_and_verify(bytes: &[u8]) -> Result { + let env = Envelope::decode_from_slice(bytes)?; - let domain = NodeInfo::DOMAIN; - let payload_type = NodeInfo::CODEC; + let domain = NodeInfo::DOMAIN; + let payload_type = NodeInfo::CODEC; - let unsigned = make_unsigned(domain.as_bytes(), payload_type, &env.payload); + let unsigned = make_unsigned(domain.as_bytes(), payload_type, &env.payload); - let pk = PublicKey::try_decode_protobuf(&env.public_key.to_vec())?; + let pk = PublicKey::try_decode_protobuf(&env.public_key.to_vec())?; - if !pk.verify(&unsigned?, &env.signature) { - return Err(SignatureVerification( - "signature verification failed".into(), - )); - } + if !pk.verify(&unsigned?, &env.signature) { + return Err(SignatureVerification( + "signature verification failed".into(), + )); + } - Ok(env) + Ok(env) + } } pub fn make_unsigned( @@ -70,7 +73,3 @@ pub fn make_unsigned( } Ok(buf) } - -use crate::handshake::envelope::Error::SignatureVerification; -pub use codec::Codec; -pub use generated::message::pb::Envelope; diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index e4029fa6..3f1a6a0a 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -1,256 +1,182 @@ +mod codec; mod envelope; pub mod node_info; -use crate::handshake::envelope::Codec; -use crate::handshake::envelope::Envelope; +use crate::handshake::codec::Codec; use crate::handshake::node_info::NodeInfo; -use crate::network::NodeInfoManager; use discv5::libp2p_identity::Keypair; -use discv5::multiaddr::Multiaddr; -use libp2p::core::transport::PortUse; -use libp2p::core::{ConnectedPoint, Endpoint}; use libp2p::request_response::{ - self, Behaviour as RequestResponseBehaviour, Config, Event as RequestResponseEvent, - InboundFailure, OutboundFailure, ProtocolSupport, ResponseChannel, -}; -use libp2p::swarm::{ - ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, - THandlerOutEvent, ToSwarm, + Behaviour as RequestResponseBehaviour, Config, InboundFailure, Message, OutboundFailure, + ProtocolSupport, ResponseChannel, }; +use libp2p::swarm::NetworkBehaviour; use libp2p::{PeerId, StreamProtocol}; -use std::task::{Context, Poll}; -use tracing::debug; +use tracing::trace; + +pub type Behaviour = RequestResponseBehaviour; +pub type Event = ::ToSwarm; #[derive(Debug)] pub enum Error { + /// We are not on the same network as the remote NetworkMismatch { ours: String, theirs: String }, + /// Serialization/Deserialization of the Node Info. NodeInfo(node_info::Error), + /// Error occurred while handling an incoming handshake. Inbound(InboundFailure), + /// Error occurred while handling an outgoing handshake. Outbound(OutboundFailure), } -/// Event emitted on handshake completion or failure. +/// We successfully completed a handshake. #[derive(Debug)] -pub enum Event { - Completed { - peer_id: PeerId, - their_info: NodeInfo, - }, - Failed { - peer_id: PeerId, - error: Error, - }, +pub struct Completed { + pub peer_id: PeerId, + pub their_info: NodeInfo, } -/// Network behaviour handling the handshake protocol. -pub struct Behaviour { - /// Request-response behaviour for the handshake protocol. - pub(crate) behaviour: RequestResponseBehaviour, - /// Keypair for signing envelopes. - keypair: Keypair, - /// Local node's information provider. - node_info_manager: NodeInfoManager, - /// Events to emit. - events: Vec, +/// The handshake either failed because of shaking with an incompatible peer or because of some +/// network failure. +#[derive(Debug)] +pub struct Failed { + pub peer_id: PeerId, + pub error: Box, } -impl Behaviour { - pub fn new(keypair: Keypair, local_node_info: NodeInfoManager) -> Self { - // NodeInfoProtocol is the protocol.ID used for handshake - const NODE_INFO_PROTOCOL: &str = "/ssv/info/0.0.1"; - - let protocol = StreamProtocol::new(NODE_INFO_PROTOCOL); - let behaviour = - RequestResponseBehaviour::new([(protocol, ProtocolSupport::Full)], Config::default()); - - Self { - behaviour, - keypair, - node_info_manager: local_node_info, - events: Vec::new(), - } - } - - /// Create a signed envelope containing local node info. - fn sealed_node_record(&self) -> Envelope { - let node_info = self.node_info_manager.get_node_info(); - node_info.seal(&self.keypair).unwrap() - } - - fn verify_node_info(&mut self, node_info: &NodeInfo) -> Result<(), Error> { - let ours = self.node_info_manager.get_node_info().network_id; - if node_info.network_id != *ours { - return Err(Error::NetworkMismatch { - ours, - theirs: node_info.network_id.clone(), - }); - } - Ok(()) - } - - fn handle_handshake_request( - &mut self, - peer_id: PeerId, - request: Envelope, - channel: ResponseChannel, - ) { - // Handle incoming request: send response then verify - let response = self.sealed_node_record(); - let _ = self.behaviour.send_response(channel, response.clone()); // Any error here is handled by the InboundFailure handler - - self.unmarshall_and_verify(peer_id, &request); - } - - fn handle_handshake_response(&mut self, peer_id: PeerId, response: &Envelope) { - self.unmarshall_and_verify(peer_id, response); - } - - fn unmarshall_and_verify(&mut self, peer_id: PeerId, envelope: &Envelope) { - let mut their_info = NodeInfo::default(); - - if let Err(e) = their_info.unmarshal(&envelope.payload) { - self.events.push(Event::Failed { - peer_id, - error: Error::NodeInfo(e), - }); - } +/// Create a libp2p Behaviour to handle handshake requests. Events emitted from this event must be +/// fed into [`handle_event`]. +pub fn create_behaviour(keypair: Keypair) -> Behaviour { + let protocol = StreamProtocol::new("/ssv/info/0.0.1"); + Behaviour::with_codec( + Codec::new(keypair), + [(protocol, ProtocolSupport::Full)], + Config::default(), + ) +} - match self.verify_node_info(&their_info) { - Ok(_) => self.events.push(Event::Completed { - peer_id, - their_info, - }), - Err(e) => self.events.push(Event::Failed { peer_id, error: e }), - } +fn verify_node_info(ours: &NodeInfo, theirs: &NodeInfo) -> Result<(), Error> { + if ours.network_id != theirs.network_id { + return Err(Error::NetworkMismatch { + ours: ours.network_id.clone(), + theirs: theirs.network_id.clone(), + }); } + Ok(()) } -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = - as NetworkBehaviour>::ConnectionHandler; - type ToSwarm = Event; - - fn handle_established_inbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result, ConnectionDenied> { - self.behaviour.handle_established_inbound_connection( - connection_id, +/// Handle an [`Event`] emitted by the passed [`Behaviour`]. The passed [`NodeInfo`] is used for +/// validating the remote peer's data and for responding to incoming requests. +pub fn handle_event( + our_node_info: &NodeInfo, + behaviour: &mut Behaviour, + event: Event, +) -> Option> { + match event { + Event::Message { peer, - local_addr, - remote_addr, - ) - } - - fn handle_established_outbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - addr: &Multiaddr, - role_override: Endpoint, - port_use: PortUse, - ) -> Result, ConnectionDenied> { - self.behaviour.handle_established_outbound_connection( - connection_id, + message: + Message::Request { + request_id: _, + request, + channel, + }, + } => Some(handle_request( + our_node_info, + behaviour, peer, - addr, - role_override, - port_use, - ) - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - // Initiate handshake on new connection - if let FromSwarm::ConnectionEstablished(conn_est) = &event { - // Only send handshake request if we initiated the connection (outbound) - if let ConnectedPoint::Dialer { .. } = conn_est.endpoint { - let peer = conn_est.peer_id; - let request = self.sealed_node_record(); - self.behaviour.send_request(&peer, request); - } - } - - // Delegate other events to inner behaviour - self.behaviour.on_swarm_event(event); - } - - fn on_connection_handler_event( - &mut self, - peer_id: PeerId, - connection_id: ConnectionId, - event: THandlerOutEvent, - ) { - self.behaviour - .on_connection_handler_event(peer_id, connection_id, event); + request, + channel, + )), + Event::Message { + peer, + message: + Message::Response { + request_id: _, + response, + }, + } => Some(handle_response(our_node_info, peer, response)), + Event::OutboundFailure { + peer, + request_id: _, + error, + } => Some(Err(Failed { + peer_id: peer, + error: Box::new(Error::Outbound(error)), + })), + Event::InboundFailure { + peer, + request_id: _, + error, + } => Some(Err(Failed { + peer_id: peer, + error: Box::new(Error::Inbound(error)), + })), + Event::ResponseSent { .. } => None, } +} - fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll>> { - // Process events from inner request-response behaviour - while let Poll::Ready(event) = self.behaviour.poll(cx) { - match event { - ToSwarm::GenerateEvent(event) => match event { - RequestResponseEvent::Message { - peer, - message: - request_response::Message::Request { - request, channel, .. - }, - } => { - debug!("Received handshake request"); - self.handle_handshake_request(peer, request, channel); - } - RequestResponseEvent::Message { - peer, - message: request_response::Message::Response { response, .. }, - } => { - debug!(?response, "Received handshake response"); - self.handle_handshake_response(peer, &response); - } - RequestResponseEvent::OutboundFailure { peer, error, .. } => { - self.events.push(Event::Failed { - peer_id: peer, - error: Error::Outbound(error), - }); - } - RequestResponseEvent::InboundFailure { peer, error, .. } => { - self.events.push(Event::Failed { - peer_id: peer, - error: Error::Inbound(error), - }); - } - _ => {} - }, - other => { - // Bubble up all other ToSwarm events. The closure is unreachable because we already handled GenerateEvent - return Poll::Ready( - other.map_out(|_| unreachable!("We already handled GenerateEvent")), - ); - } - } - } +fn handle_request( + our_node_info: &NodeInfo, + behaviour: &mut Behaviour, + peer_id: PeerId, + request: NodeInfo, + channel: ResponseChannel, +) -> Result { + trace!(?peer_id, "handling handshake request"); + // Handle incoming request: send response then verify + // Any error here is handled by the InboundFailure handler + let _ = behaviour.send_response(channel, our_node_info.clone()); + + verify_node_info(our_node_info, &request).map_err(|error| Failed { + peer_id, + error: Box::new(error), + })?; + + Ok(Completed { + peer_id, + their_info: request, + }) +} - // Emit queued events - if !self.events.is_empty() { - return Poll::Ready(ToSwarm::GenerateEvent(self.events.remove(0))); - } +fn handle_response( + our_node_info: &NodeInfo, + peer_id: PeerId, + response: NodeInfo, +) -> Result { + trace!(?peer_id, "handling handshake response"); + verify_node_info(our_node_info, &response).map_err(|error| Failed { + peer_id, + error: Box::new(error), + })?; + + Ok(Completed { + peer_id, + their_info: response, + }) +} - Poll::Pending - } +/// Send a handshake request to a specified peer. Should be called after establishing an outgoing +/// connection. +pub fn initiate(our_node_info: &NodeInfo, behaviour: &mut Behaviour, peer_id: PeerId) { + trace!(?peer_id, "initiating handshake"); + behaviour.send_request(&peer_id, our_node_info.clone()); } #[cfg(test)] mod tests { + // Init tracing + static TRACING: LazyLock<()> = LazyLock::new(|| { + let env_filter = tracing_subscriber::EnvFilter::new("trace"); + tracing_subscriber::fmt().with_env_filter(env_filter).init(); + }); + use super::*; use crate::handshake::node_info::NodeMetadata; use discv5::libp2p_identity::Keypair; - use libp2p_swarm::Swarm; - use libp2p_swarm_test::{drive, SwarmExt}; + use libp2p_swarm::{Swarm, SwarmEvent}; + use libp2p_swarm_test::SwarmExt; + use std::sync::LazyLock; + use tokio::select; fn node_info(network: &str, version: &str) -> NodeInfo { NodeInfo { @@ -264,36 +190,63 @@ mod tests { } } - fn test_behaviour(network: &str, version: &str, keypair: Keypair) -> Behaviour { - let node_info_manager = NodeInfoManager::new(node_info(network, version)); - Behaviour::new(keypair, node_info_manager) - } - #[tokio::test] async fn handshake_success() { + *TRACING; + let local_key = Keypair::generate_ed25519(); let remote_key = Keypair::generate_ed25519(); - let mut local_swarm = Swarm::new_ephemeral(|_| test_behaviour("test", "local", local_key)); - let mut remote_swarm = - Swarm::new_ephemeral(|_| test_behaviour("test", "remote", remote_key.clone())); + let mut local_swarm = Swarm::new_ephemeral(|_| create_behaviour(local_key)); + let local_node_info = node_info("test", "local"); + let mut remote_swarm = Swarm::new_ephemeral(|_| create_behaviour(remote_key)); + let remote_node_info = node_info("test", "remote"); tokio::spawn(async move { local_swarm.listen().with_memory_addr_external().await; remote_swarm.connect(&mut local_swarm).await; - // Drive the swarm until the handshake completes - let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = - drive(&mut local_swarm, &mut remote_swarm).await; - - assert!(matches!(local_event, - Event::Completed { peer_id, ref their_info } if peer_id == *remote_swarm.local_peer_id() && their_info.metadata.as_ref().unwrap().node_version == "remote") + initiate( + &remote_node_info, + remote_swarm.behaviour_mut(), + *local_swarm.local_peer_id(), ); - assert!(matches!(remote_event, - Event::Completed { peer_id, ref their_info } if peer_id == *local_swarm.local_peer_id() && their_info.metadata.as_ref().unwrap().node_version == "local") - ); + let mut local_completed = false; + let mut remote_completed = false; + + while !local_completed && !remote_completed { + select!( + SwarmEvent::Behaviour(e) = local_swarm.next_swarm_event() => { + let Some(result) = + handle_event(&local_node_info, local_swarm.behaviour_mut(), e) else { + continue; + }; + let Completed { + peer_id, + their_info, + } = result.expect("handshake to succeed"); + assert_eq!(peer_id, *remote_swarm.local_peer_id()); + assert_eq!(their_info.metadata.unwrap().node_version, "remote"); + local_completed = true; + } + SwarmEvent::Behaviour(e) = remote_swarm.next_swarm_event() => { + let Some(result) = + handle_event(&remote_node_info, remote_swarm.behaviour_mut(), e) else { + continue; + }; + let Completed { + peer_id, + their_info, + } = result.expect("handshake to succeed"); + assert_eq!(peer_id, *local_swarm.local_peer_id()); + assert_eq!(their_info.metadata.unwrap().node_version, "local"); + remote_completed = true; + } + else => {} + ) + } }) .await .expect("tokio runtime failed"); @@ -301,32 +254,69 @@ mod tests { #[tokio::test] async fn mismatched_networks_handshake_failed() { + *TRACING; + let local_key = Keypair::generate_ed25519(); let remote_key = Keypair::generate_ed25519(); - let mut local_swarm = Swarm::new_ephemeral(|_| test_behaviour("test1", "local", local_key)); - let mut remote_swarm = - Swarm::new_ephemeral(|_| test_behaviour("test2", "remote", remote_key.clone())); + let mut local_swarm = Swarm::new_ephemeral(|_| create_behaviour(local_key)); + let local_node_info = node_info("test1", "local"); + let mut remote_swarm = Swarm::new_ephemeral(|_| create_behaviour(remote_key)); + let remote_node_info = node_info("test2", "remote"); tokio::spawn(async move { local_swarm.listen().with_memory_addr_external().await; remote_swarm.connect(&mut local_swarm).await; - // Drive the swarm until the handshake completes - let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = - drive(&mut local_swarm, &mut remote_swarm).await; - - assert!(matches!(remote_event, - Event::Failed { peer_id, error } if peer_id == *local_swarm.local_peer_id() - && matches!( - error, Error::NetworkMismatch { ref ours , ref theirs } if ours.as_str() == "test2" && theirs.as_str() == "test1")) + initiate( + &remote_node_info, + remote_swarm.behaviour_mut(), + *local_swarm.local_peer_id(), ); - assert!(matches!(local_event, - Event::Failed { peer_id, error } if peer_id == *remote_swarm.local_peer_id() - && matches!(error, Error::NetworkMismatch { ref ours, ref theirs } if ours.as_str() == "test1" && theirs.as_str() == "test2")) - ); + let mut local_failed = false; + let mut remote_failed = false; + + while !local_failed && !remote_failed { + select!( + SwarmEvent::Behaviour(e) = local_swarm.next_swarm_event() => { + let Some(result) = + handle_event(&local_node_info, local_swarm.behaviour_mut(), e) else { + continue; + }; + let Failed { + peer_id, + error, + } = result.expect_err("handshake to fail"); + let Error::NetworkMismatch { ours, theirs } = *error else { + panic!("expected network mismatch"); + }; + assert_eq!(peer_id, *remote_swarm.local_peer_id()); + assert_eq!(ours, "test1"); + assert_eq!(theirs, "test2"); + local_failed = true; + } + SwarmEvent::Behaviour(e) = remote_swarm.next_swarm_event() => { + let Some(result) = + handle_event(&remote_node_info, remote_swarm.behaviour_mut(), e) else { + continue; + }; + let Failed { + peer_id, + error, + } = result.expect_err("handshake to fail"); + let Error::NetworkMismatch { ours, theirs } = *error else { + panic!("expected network mismatch"); + }; + assert_eq!(peer_id, *local_swarm.local_peer_id()); + assert_eq!(ours, "test2"); + assert_eq!(theirs, "test1"); + remote_failed = true; + } + else => {} + ) + } }) .await .expect("tokio runtime failed"); diff --git a/anchor/network/src/handshake/node_info.rs b/anchor/network/src/handshake/node_info.rs index 78a8ba0b..87e84e98 100644 --- a/anchor/network/src/handshake/node_info.rs +++ b/anchor/network/src/handshake/node_info.rs @@ -1,9 +1,8 @@ use crate::handshake::envelope::{make_unsigned, Envelope}; +use crate::handshake::node_info::Error::Validation; use discv5::libp2p_identity::{Keypair, SigningError}; use serde::{Deserialize, Serialize}; use serde_json; - -use crate::handshake::node_info::Error::Validation; use thiserror::Error; #[derive(Debug, Error)] @@ -73,18 +72,20 @@ impl NodeInfo { } /// Deserialize `NodeInfo` from JSON bytes, replacing `self`. - pub fn unmarshal(&mut self, data: &[u8]) -> Result<(), Error> { + pub fn unmarshal(data: &[u8]) -> Result { let ser: Serializable = serde_json::from_slice(data)?; if ser.entries.len() < 2 { return Err(Validation("node info must have at least 2 entries".into())); } // skip ser.entries[0]: old forkVersion - self.network_id = ser.entries[1].clone(); - if ser.entries.len() >= 3 { + let network_id = ser.entries[1].clone(); + let metadata = if ser.entries.len() >= 3 { let meta = serde_json::from_slice(ser.entries[2].as_bytes())?; - self.metadata = Some(meta); - } - Ok(()) + Some(meta) + } else { + None + }; + Ok(NodeInfo::new(network_id, metadata)) } /// Seals a `Record` into an Envelope by: @@ -114,7 +115,7 @@ impl NodeInfo { #[cfg(test)] mod tests { - use crate::handshake::envelope::parse_envelope; + use crate::handshake::envelope::Envelope; use crate::handshake::node_info::{NodeInfo, NodeMetadata}; use libp2p::identity::Keypair; @@ -138,11 +139,9 @@ mod tests { let data = envelope.encode_to_vec().unwrap(); - let parsed_env = parse_envelope(&data).expect("Consume failed"); - let mut parsed_node_info = NodeInfo::default(); - parsed_node_info - .unmarshal(&parsed_env.payload) - .expect("TODO: panic message"); + let parsed_env = Envelope::parse_and_verify(&data).expect("Consume failed"); + let parsed_node_info = + NodeInfo::unmarshal(&parsed_env.payload).expect("TODO: panic message"); assert_eq!(node_info, parsed_node_info); @@ -151,11 +150,9 @@ mod tests { a00e42407120c7373762f6e6f6465696e666f1aa5017b22456e7472696573223a5b22222c22686f6c65736b7\ 9222c227b5c224e6f646556657273696f6e5c223a5c22676574682f785c222c5c22457865637574696f6e4e6f64655c223a5c22676574682f785c222c5c22436f6e73656e7375734e6f64655c223a5c22707279736d2f785c222c5c225375626e6574735c223a5c2230303030303030303030303030303030303030303030303030303030303030305c227d225d7d2a473045022100b8a2a668113330369e74b86ec818a87009e2a351f7ee4c0e431e1f659dd1bc3f02202b1ebf418efa7fb0541f77703bea8563234a1b70b8391d43daa40b6e7c3fcc84").unwrap(); - let parsed_env = parse_envelope(&encoded).expect("Consume failed"); - let mut parsed_node_info = NodeInfo::default(); - parsed_node_info - .unmarshal(&parsed_env.payload) - .expect("TODO: panic message"); + let parsed_env = Envelope::parse_and_verify(&encoded).expect("Consume failed"); + let parsed_node_info = + NodeInfo::unmarshal(&parsed_env.payload).expect("TODO: panic message"); assert_eq!(node_info, parsed_node_info); } @@ -183,19 +180,15 @@ mod tests { .expect("marshal_record should succeed"); // 2) Unmarshal into parsed_rec - let mut parsed_rec = NodeInfo::default(); - parsed_rec - .unmarshal(&data) - .expect("unmarshal_record should succeed"); + let parsed_rec = NodeInfo::unmarshal(&data).expect("unmarshal_record should succeed"); // 3) Now unmarshal the old format data into the same struct - parsed_rec - .unmarshal(old_serialized_data) - .expect("unmarshal old data should succeed"); + let old_format = + NodeInfo::unmarshal(old_serialized_data).expect("unmarshal old data should succeed"); // 4) Compare // The Go test checks reflect.DeepEqual(currentSerializedData, parsedRec) // We can do the same in Rust using assert_eq. - assert_eq!(current_data, parsed_rec); + assert_eq!(old_format, parsed_rec); } } diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 1be3adbd..72228de7 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -1,11 +1,11 @@ use std::num::{NonZeroU8, NonZeroUsize}; use std::pin::Pin; -use std::sync::Arc; use std::time::Duration; use futures::StreamExt; use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::Boxed; +use libp2p::core::ConnectedPoint; use libp2p::gossipsub::{IdentTopic, MessageAuthenticity, ValidationMode}; use libp2p::identity::Keypair; use libp2p::multiaddr::Protocol; @@ -13,29 +13,27 @@ use libp2p::swarm::SwarmEvent; use libp2p::{futures, gossipsub, identify, ping, PeerId, Swarm, SwarmBuilder}; use lighthouse_network::discovery::DiscoveredPeers; use lighthouse_network::discv5::enr::k256::sha2::{Digest, Sha256}; +use lighthouse_network::EnrExt; +use ssz::Decode; +use subnet_tracker::{SubnetEvent, SubnetId}; use task_executor::TaskExecutor; +use tokio::sync::mpsc; use tracing::{debug, error, info, trace, warn}; use crate::behaviour::AnchorBehaviour; use crate::behaviour::AnchorBehaviourEvent; use crate::discovery::{Discovery, FIND_NODE_QUERY_CLOSEST_PEERS}; +use crate::handshake::node_info::{NodeInfo, NodeMetadata}; use crate::keypair_utils::load_private_key; use crate::transport::build_transport; -use crate::Config; - -use crate::handshake::node_info::{NodeInfo, NodeMetadata}; -use crate::handshake::{Behaviour, Event}; use crate::types::ssv_message::SignedSSVMessage; -use lighthouse_network::EnrExt; -use parking_lot::RwLock; -use ssz::Decode; -use subnet_tracker::{SubnetEvent, SubnetId}; -use tokio::sync::mpsc; +use crate::{handshake, Config}; pub struct Network { swarm: Swarm, subnet_event_receiver: mpsc::Receiver, peer_id: PeerId, + node_info: NodeInfo, } impl Network { @@ -50,6 +48,16 @@ impl Network { let transport = build_transport(local_keypair.clone(), !config.disable_quic_support); let behaviour = build_anchor_behaviour(local_keypair.clone(), config).await; let peer_id = local_keypair.public().to_peer_id(); + let domain_type: String = config.domain_type.clone().into(); + let node_info = NodeInfo::new( + domain_type, + Some(NodeMetadata { + node_version: "1.0.0".to_string(), + execution_node: "geth/v1.10.8".to_string(), + consensus_node: "lighthouse/v1.5.0".to_string(), + subnets: "ffffffffffffffffffffffffffffffff".to_string(), + }), + ); let mut network = Network { swarm: build_swarm( @@ -61,6 +69,7 @@ impl Network { ), subnet_event_receiver, peer_id, + node_info, }; info!(%peer_id, "Network starting"); @@ -157,14 +166,31 @@ impl Network { } } } - AnchorBehaviourEvent::Handshake(ev) => { - handle_handshake_event(ev); + AnchorBehaviourEvent::Handshake(event) => { + if let Some(result) = handshake::handle_event( + &self.node_info, + &mut self.swarm.behaviour_mut().handshake, + event, + ) { + self.handle_handshake_result(result); + } } // TODO handle other behaviour events _ => { debug!(event = ?behaviour_event, "Unhandled behaviour event"); } }, + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint: ConnectedPoint::Dialer { .. }, + .. + } => { + handshake::initiate( + &self.node_info, + &mut self.swarm.behaviour_mut().handshake, + peer_id + ); + } // TODO handle other swarm events _ => { debug!(event = ?swarm_message, "Unhandled swarm event"); @@ -213,27 +239,27 @@ impl Network { } } } + + fn handle_handshake_result(&mut self, result: Result) { + match result { + Ok(handshake::Completed { + peer_id, + their_info, + }) => { + debug!(%peer_id, ?their_info, "Handshake completed"); + // Update peer store with their_info + } + Err(handshake::Failed { peer_id, error }) => { + debug!(%peer_id, ?error, "Handshake failed"); + } + } + } } fn subnet_to_topic(subnet: SubnetId) -> IdentTopic { IdentTopic::new(format!("ssv.{}", *subnet)) } -fn handle_handshake_event(ev: Event) { - match ev { - Event::Completed { - peer_id, - their_info, - } => { - debug!(%peer_id, ?their_info, "Handshake completed"); - // Update peer store with their_info - } - Event::Failed { peer_id, error } => { - debug!(%peer_id, ?error, "Handshake failed"); - } - } -} - async fn build_anchor_behaviour( local_keypair: Keypair, network_config: &Config, @@ -288,17 +314,7 @@ async fn build_anchor_behaviour( discovery }; - let domain_type: String = network_config.clone().domain_type.into(); - let node_info = NodeInfo::new( - domain_type, - Some(NodeMetadata { - node_version: "1.0.0".to_string(), - execution_node: "geth/v1.10.8".to_string(), - consensus_node: "lighthouse/v1.5.0".to_string(), - subnets: "ffffffffffffffffffffffffffffffff".to_string(), - }), - ); - let handshake = Behaviour::new(local_keypair.clone(), NodeInfoManager::new(node_info)); + let handshake = handshake::create_behaviour(local_keypair.clone()); AnchorBehaviour { identify, @@ -362,26 +378,6 @@ fn build_swarm( .build() } -pub struct NodeInfoManager { - node_info: Arc>, -} - -impl NodeInfoManager { - pub fn new(node_info: NodeInfo) -> Self { - Self { - node_info: Arc::new(RwLock::new(node_info)), - } - } - - pub fn get_node_info(&self) -> NodeInfo { - self.node_info.read().clone() - } - - pub fn set_node_info(&self, node_info: NodeInfo) { - *self.node_info.write() = node_info; - } -} - #[cfg(test)] mod test { use crate::network::Network;