diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index c35bfbae47f..d61836979e8 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -12,7 +12,7 @@ use futures::{ channel::{mpsc, oneshot}, prelude::*, }; -use tokio::{net::TcpStream, sync::broadcast, time::timeout}; +use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout}; use tokio_util::codec::Framed; use tower::Service; use tracing::{span, Level, Span}; @@ -180,6 +180,138 @@ where } } +/// Negotiate the Zcash network protocol version with the remote peer +/// at `addr`, using the connection `peer_conn`. +/// +/// We split `Handshake` into its components before calling this function, +/// to avoid infectious `Sync` bounds on the returned future. +pub async fn negotiate_version( + peer_conn: &mut Framed, + addr: &SocketAddr, + config: Config, + nonces: Arc>>, + user_agent: String, + our_services: PeerServices, + relay: bool, +) -> Result<(Version, PeerServices), HandshakeError> { + // Create a random nonce for this connection + let local_nonce = Nonce::default(); + nonces + .lock() + .expect("mutex should be unpoisoned") + .insert(local_nonce); + + // Don't leak our exact clock skew to our peers. On the other hand, + // we can't deviate too much, or zcashd will get confused. + // Inspection of the zcashd source code reveals that the timestamp + // is only ever used at the end of parsing the version message, in + // + // pfrom->nTimeOffset = timeWarning.AddTimeData(pfrom->addr, nTime, GetTime()); + // + // AddTimeData is defined in src/timedata.cpp and is a no-op as long + // as the difference between the specified timestamp and the + // zcashd's local time is less than TIMEDATA_WARNING_THRESHOLD, set + // to 10 * 60 seconds (10 minutes). + // + // nTimeOffset is peer metadata that is never used, except for + // statistics. + // + // To try to stay within the range where zcashd will ignore our clock skew, + // truncate the timestamp to the nearest 5 minutes. + let now = Utc::now().timestamp(); + let timestamp = Utc.timestamp(now - now.rem_euclid(5 * 60), 0); + + let our_version = Message::Version { + version: constants::CURRENT_VERSION, + services: our_services, + timestamp, + address_recv: (PeerServices::NODE_NETWORK, *addr), + // TODO: detect external address (#1893) + address_from: (our_services, config.listen_addr), + nonce: local_nonce, + user_agent: user_agent.clone(), + // The protocol works fine if we don't reveal our current block height, + // and not sending it means we don't need to be connected to the chain state. + start_height: block::Height(0), + relay, + }; + + debug!(?our_version, "sending initial version message"); + peer_conn.send(our_version).await?; + + let remote_msg = peer_conn + .next() + .await + .ok_or(HandshakeError::ConnectionClosed)??; + + // Check that we got a Version and destructure its fields into the local scope. + debug!(?remote_msg, "got message from remote peer"); + let (remote_nonce, remote_services, remote_version) = if let Message::Version { + nonce, + services, + version, + .. + } = remote_msg + { + (nonce, services, version) + } else { + Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)))? + }; + + // Check for nonce reuse, indicating self-connection. + let nonce_reuse = { + let mut locked_nonces = nonces.lock().expect("mutex should be unpoisoned"); + let nonce_reuse = locked_nonces.contains(&remote_nonce); + // Regardless of whether we observed nonce reuse, clean up the nonce set. + locked_nonces.remove(&local_nonce); + nonce_reuse + }; + if nonce_reuse { + Err(HandshakeError::NonceReuse)?; + } + + peer_conn.send(Message::Verack).await?; + + let remote_msg = peer_conn + .next() + .await + .ok_or(HandshakeError::ConnectionClosed)??; + if let Message::Verack = remote_msg { + debug!("got verack from remote peer"); + } else { + Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)))?; + } + + // XXX in zcashd remote peer can only send one version message and + // we would disconnect here if it received a second one. Is it even possible + // for that to happen to us here? + + // TODO: Reject incoming connections from nodes that don't know about the current epoch. + // zcashd does this: + // const Consensus::Params& consensusParams = chainparams.GetConsensus(); + // auto currentEpoch = CurrentEpoch(GetHeight(), consensusParams); + // if (pfrom->nVersion < consensusParams.vUpgrades[currentEpoch].nProtocolVersion) + // + // For approximately 1.5 days before a network upgrade, zcashd also: + // - avoids old peers, and + // - prefers updated peers. + // We haven't decided if we need this behaviour in Zebra yet (see #706). + // + // At the network upgrade, we also need to disconnect from old peers (see #1334). + // + // TODO: replace min_for_upgrade(network, MIN_NETWORK_UPGRADE) with + // current_min(network, height) where network is the + // configured network, and height is the best tip's block + // height. + + if remote_version < Version::min_for_upgrade(config.network, constants::MIN_NETWORK_UPGRADE) { + // Disconnect if peer is using an obsolete version. + Err(HandshakeError::ObsoleteVersion(remote_version))?; + } + + Ok((remote_version, remote_services)) +} + impl Service<(TcpStream, SocketAddr)> for Handshake where S: Service + Clone + Send + 'static, @@ -197,148 +329,51 @@ where fn call(&mut self, req: (TcpStream, SocketAddr)) -> Self::Future { let (tcp_stream, addr) = req; - let connector_span = span!(Level::INFO, "connector", addr = ?addr); + let connector_span = span!(Level::INFO, "connector", ?addr); // set the peer connection span's parent to the global span, as it // should exist independently of its creation source (inbound // connection, crawler, initial peer, ...) - let connection_span = span!(parent: &self.parent_span, Level::INFO, "peer", addr = ?addr); + let connection_span = span!(parent: &self.parent_span, Level::INFO, "peer", ?addr); // Clone these upfront, so they can be moved into the future. let nonces = self.nonces.clone(); let inbound_service = self.inbound_service.clone(); let timestamp_collector = self.timestamp_collector.clone(); let inv_collector = self.inv_collector.clone(); - let network = self.config.network; - let our_addr = self.config.listen_addr; + let config = self.config.clone(); let user_agent = self.user_agent.clone(); let our_services = self.our_services; let relay = self.relay; let fut = async move { - debug!("connecting to remote peer"); + debug!(?addr, "negotiating protocol version with remote peer"); // CORRECTNESS // // As a defence-in-depth against hangs, every send or next on stream // should be wrapped in a timeout. - let mut stream = Framed::new( + let mut peer_conn = Framed::new( tcp_stream, Codec::builder() - .for_network(network) + .for_network(config.network) .with_metrics_label(addr.ip().to_string()) .finish(), ); - let local_nonce = Nonce::default(); - nonces - .lock() - .expect("mutex should be unpoisoned") - .insert(local_nonce); - - // Don't leak our exact clock skew to our peers. On the other hand, - // we can't deviate too much, or zcashd will get confused. - // Inspection of the zcashd source code reveals that the timestamp - // is only ever used at the end of parsing the version message, in - // - // pfrom->nTimeOffset = timeWarning.AddTimeData(pfrom->addr, nTime, GetTime()); - // - // AddTimeData is defined in src/timedata.cpp and is a no-op as long - // as the difference between the specified timestamp and the - // zcashd's local time is less than TIMEDATA_WARNING_THRESHOLD, set - // to 10 * 60 seconds (10 minutes). - // - // nTimeOffset is peer metadata that is never used, except for - // statistics. - // - // To try to stay within the range where zcashd will ignore our clock skew, - // truncate the timestamp to the nearest 5 minutes. - let now = Utc::now().timestamp(); - let timestamp = Utc.timestamp(now - now.rem_euclid(5 * 60), 0); - - let version = Message::Version { - version: constants::CURRENT_VERSION, - services: our_services, - timestamp, - address_recv: (PeerServices::NODE_NETWORK, addr), - address_from: (our_services, our_addr), - nonce: local_nonce, - user_agent, - // The protocol works fine if we don't reveal our current block height, - // and not sending it means we don't need to be connected to the chain state. - start_height: block::Height(0), - relay, - }; - - debug!(?version, "sending initial version message"); - timeout(constants::REQUEST_TIMEOUT, stream.send(version)).await??; - - let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next()) - .await? - .ok_or(HandshakeError::ConnectionClosed)??; - - // Check that we got a Version and destructure its fields into the local scope. - debug!(?remote_msg, "got message from remote peer"); - let (remote_nonce, remote_services, remote_version) = if let Message::Version { - nonce, - services, - version, - .. - } = remote_msg - { - (nonce, services, version) - } else { - return Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg))); - }; - - // Check for nonce reuse, indicating self-connection. - let nonce_reuse = { - let mut locked_nonces = nonces.lock().expect("mutex should be unpoisoned"); - let nonce_reuse = locked_nonces.contains(&remote_nonce); - // Regardless of whether we observed nonce reuse, clean up the nonce set. - locked_nonces.remove(&local_nonce); - nonce_reuse - }; - if nonce_reuse { - return Err(HandshakeError::NonceReuse); - } - - timeout(constants::REQUEST_TIMEOUT, stream.send(Message::Verack)).await??; - - let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next()) - .await? - .ok_or(HandshakeError::ConnectionClosed)??; - if let Message::Verack = remote_msg { - debug!("got verack from remote peer"); - } else { - return Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg))); - } - - // XXX in zcashd remote peer can only send one version message and - // we would disconnect here if it received a second one. Is it even possible - // for that to happen to us here? - - // TODO: Reject incoming connections from nodes that don't know about the current epoch. - // zcashd does this: - // const Consensus::Params& consensusParams = chainparams.GetConsensus(); - // auto currentEpoch = CurrentEpoch(GetHeight(), consensusParams); - // if (pfrom->nVersion < consensusParams.vUpgrades[currentEpoch].nProtocolVersion) - // - // For approximately 1.5 days before a network upgrade, zcashd also: - // - avoids old peers, and - // - prefers updated peers. - // We haven't decided if we need this behaviour in Zebra yet (see #706). - // - // At the network upgrade, we also need to disconnect from old peers (see #1334). - // - // TODO: replace min_for_upgrade(network, MIN_NETWORK_UPGRADE) with - // current_min(network, height) where network is the - // configured network, and height is the best tip's block - // height. - - if remote_version < Version::min_for_upgrade(network, constants::MIN_NETWORK_UPGRADE) { - // Disconnect if peer is using an obsolete version. - return Err(HandshakeError::ObsoleteVersion(remote_version)); - } + // Wrap the entire initial connection setup in a timeout. + let (remote_version, remote_services) = timeout( + constants::HANDSHAKE_TIMEOUT, + negotiate_version( + &mut peer_conn, + &addr, + config, + nonces, + user_agent, + our_services, + relay, + ), + ) + .await??; // Set the connection's version to the minimum of the received version or our own. let negotiated_version = std::cmp::min(remote_version, constants::CURRENT_VERSION); @@ -348,7 +383,7 @@ where // XXX The tokio documentation says not to do this while any frames are still being processed. // Since we don't know that here, another way might be to release the tcp // stream from the unversioned Framed wrapper and construct a new one with a versioned codec. - let bare_codec = stream.codec_mut(); + let bare_codec = peer_conn.codec_mut(); bare_codec.reconfigure_version(negotiated_version); debug!("constructing client, spawning server"); @@ -365,7 +400,7 @@ where error_slot: slot.clone(), }; - let (peer_tx, peer_rx) = stream.split(); + let (peer_tx, peer_rx) = peer_conn.split(); // Instrument the peer's rx and tx streams. @@ -389,6 +424,7 @@ where // Every message and error must update the peer address state via // the inbound_ts_collector. let inbound_ts_collector = timestamp_collector.clone(); + let inv_collector = inv_collector.clone(); let peer_rx = peer_rx .then(move |msg| { // Add a metric for inbound messages and errors. @@ -625,13 +661,7 @@ where // Spawn a new task to drive this handshake. tokio::spawn(fut.instrument(connector_span)) - // This is required to get error types to line up. - // Probably there's a nicer way to express this using combinators. - .map(|x| match x { - Ok(Ok(client)) => Ok(client), - Ok(Err(handshake_err)) => Err(handshake_err.into()), - Err(join_err) => Err(join_err.into()), - }) + .map(|x: Result, JoinError>| Ok(x??)) .boxed() } }