Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure handshake version negotiation always has a timeout #2008

Merged
merged 1 commit into from
Apr 19, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 157 additions & 127 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::{
channel::{mpsc, oneshot},
prelude::*,
};
use tokio::{net::TcpStream, sync::broadcast, time::timeout};
use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout};
use tokio_util::codec::Framed;
use tower::Service;
use tracing::{span, Level, Span};
Expand Down Expand Up @@ -180,6 +180,138 @@ where
}
}

/// Negotiate the Zcash network protocol version with the remote peer
/// at `addr`, using the connection `peer_conn`.
///
/// We split `Handshake` into its components before calling this function,
/// to avoid infectious `Sync` bounds on the returned future.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'infectious' is a good way to put this 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it, and they just got everywhere

pub async fn negotiate_version(
peer_conn: &mut Framed<TcpStream, Codec>,
addr: &SocketAddr,
config: Config,
nonces: Arc<Mutex<HashSet<Nonce>>>,
user_agent: String,
our_services: PeerServices,
relay: bool,
) -> Result<(Version, PeerServices), HandshakeError> {
// Create a random nonce for this connection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let local_nonce = Nonce::default();
nonces
.lock()
.expect("mutex should be unpoisoned")
.insert(local_nonce);

// Don't leak our exact clock skew to our peers. On the other hand,
// we can't deviate too much, or zcashd will get confused.
// Inspection of the zcashd source code reveals that the timestamp
// is only ever used at the end of parsing the version message, in
//
// pfrom->nTimeOffset = timeWarning.AddTimeData(pfrom->addr, nTime, GetTime());
//
// AddTimeData is defined in src/timedata.cpp and is a no-op as long
// as the difference between the specified timestamp and the
// zcashd's local time is less than TIMEDATA_WARNING_THRESHOLD, set
// to 10 * 60 seconds (10 minutes).
//
// nTimeOffset is peer metadata that is never used, except for
// statistics.
//
// To try to stay within the range where zcashd will ignore our clock skew,
// truncate the timestamp to the nearest 5 minutes.
let now = Utc::now().timestamp();
let timestamp = Utc.timestamp(now - now.rem_euclid(5 * 60), 0);

let our_version = Message::Version {
version: constants::CURRENT_VERSION,
services: our_services,
timestamp,
address_recv: (PeerServices::NODE_NETWORK, *addr),
// TODO: detect external address (#1893)
address_from: (our_services, config.listen_addr),
nonce: local_nonce,
user_agent: user_agent.clone(),
// The protocol works fine if we don't reveal our current block height,
// and not sending it means we don't need to be connected to the chain state.
start_height: block::Height(0),
Comment on lines +233 to +235
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥰

relay,
};

debug!(?our_version, "sending initial version message");
peer_conn.send(our_version).await?;

let remote_msg = peer_conn
.next()
.await
.ok_or(HandshakeError::ConnectionClosed)??;

// Check that we got a Version and destructure its fields into the local scope.
debug!(?remote_msg, "got message from remote peer");
let (remote_nonce, remote_services, remote_version) = if let Message::Version {
nonce,
services,
version,
..
} = remote_msg
{
(nonce, services, version)
} else {
Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)))?
};

// Check for nonce reuse, indicating self-connection.
let nonce_reuse = {
let mut locked_nonces = nonces.lock().expect("mutex should be unpoisoned");
let nonce_reuse = locked_nonces.contains(&remote_nonce);
// Regardless of whether we observed nonce reuse, clean up the nonce set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

locked_nonces.remove(&local_nonce);
nonce_reuse
};
if nonce_reuse {
Err(HandshakeError::NonceReuse)?;
}

peer_conn.send(Message::Verack).await?;

let remote_msg = peer_conn
.next()
.await
.ok_or(HandshakeError::ConnectionClosed)??;
if let Message::Verack = remote_msg {
debug!("got verack from remote peer");
} else {
Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)))?;
}

// XXX in zcashd remote peer can only send one version message and
// we would disconnect here if it received a second one. Is it even possible
// for that to happen to us here?

// TODO: Reject incoming connections from nodes that don't know about the current epoch.
// zcashd does this:
// const Consensus::Params& consensusParams = chainparams.GetConsensus();
// auto currentEpoch = CurrentEpoch(GetHeight(), consensusParams);
// if (pfrom->nVersion < consensusParams.vUpgrades[currentEpoch].nProtocolVersion)
//
// For approximately 1.5 days before a network upgrade, zcashd also:
// - avoids old peers, and
// - prefers updated peers.
// We haven't decided if we need this behaviour in Zebra yet (see #706).
//
// At the network upgrade, we also need to disconnect from old peers (see #1334).
//
// TODO: replace min_for_upgrade(network, MIN_NETWORK_UPGRADE) with
// current_min(network, height) where network is the
// configured network, and height is the best tip's block
// height.

if remote_version < Version::min_for_upgrade(config.network, constants::MIN_NETWORK_UPGRADE) {
// Disconnect if peer is using an obsolete version.
Err(HandshakeError::ObsoleteVersion(remote_version))?;
}

Ok((remote_version, remote_services))
}

impl<S> Service<(TcpStream, SocketAddr)> for Handshake<S>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
Expand All @@ -197,148 +329,51 @@ where
fn call(&mut self, req: (TcpStream, SocketAddr)) -> Self::Future {
let (tcp_stream, addr) = req;

let connector_span = span!(Level::INFO, "connector", addr = ?addr);
let connector_span = span!(Level::INFO, "connector", ?addr);
// set the peer connection span's parent to the global span, as it
// should exist independently of its creation source (inbound
// connection, crawler, initial peer, ...)
let connection_span = span!(parent: &self.parent_span, Level::INFO, "peer", addr = ?addr);
let connection_span = span!(parent: &self.parent_span, Level::INFO, "peer", ?addr);

// Clone these upfront, so they can be moved into the future.
let nonces = self.nonces.clone();
let inbound_service = self.inbound_service.clone();
let timestamp_collector = self.timestamp_collector.clone();
let inv_collector = self.inv_collector.clone();
let network = self.config.network;
let our_addr = self.config.listen_addr;
let config = self.config.clone();
let user_agent = self.user_agent.clone();
let our_services = self.our_services;
let relay = self.relay;

let fut = async move {
debug!("connecting to remote peer");
debug!(?addr, "negotiating protocol version with remote peer");

// CORRECTNESS
//
// As a defence-in-depth against hangs, every send or next on stream
// should be wrapped in a timeout.
let mut stream = Framed::new(
let mut peer_conn = Framed::new(
tcp_stream,
Codec::builder()
.for_network(network)
.for_network(config.network)
.with_metrics_label(addr.ip().to_string())
.finish(),
);

let local_nonce = Nonce::default();
nonces
.lock()
.expect("mutex should be unpoisoned")
.insert(local_nonce);

// Don't leak our exact clock skew to our peers. On the other hand,
// we can't deviate too much, or zcashd will get confused.
// Inspection of the zcashd source code reveals that the timestamp
// is only ever used at the end of parsing the version message, in
//
// pfrom->nTimeOffset = timeWarning.AddTimeData(pfrom->addr, nTime, GetTime());
//
// AddTimeData is defined in src/timedata.cpp and is a no-op as long
// as the difference between the specified timestamp and the
// zcashd's local time is less than TIMEDATA_WARNING_THRESHOLD, set
// to 10 * 60 seconds (10 minutes).
//
// nTimeOffset is peer metadata that is never used, except for
// statistics.
//
// To try to stay within the range where zcashd will ignore our clock skew,
// truncate the timestamp to the nearest 5 minutes.
let now = Utc::now().timestamp();
let timestamp = Utc.timestamp(now - now.rem_euclid(5 * 60), 0);

let version = Message::Version {
version: constants::CURRENT_VERSION,
services: our_services,
timestamp,
address_recv: (PeerServices::NODE_NETWORK, addr),
address_from: (our_services, our_addr),
nonce: local_nonce,
user_agent,
// The protocol works fine if we don't reveal our current block height,
// and not sending it means we don't need to be connected to the chain state.
start_height: block::Height(0),
relay,
};

debug!(?version, "sending initial version message");
timeout(constants::REQUEST_TIMEOUT, stream.send(version)).await??;

let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next())
.await?
.ok_or(HandshakeError::ConnectionClosed)??;

// Check that we got a Version and destructure its fields into the local scope.
debug!(?remote_msg, "got message from remote peer");
let (remote_nonce, remote_services, remote_version) = if let Message::Version {
nonce,
services,
version,
..
} = remote_msg
{
(nonce, services, version)
} else {
return Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)));
};

// Check for nonce reuse, indicating self-connection.
let nonce_reuse = {
let mut locked_nonces = nonces.lock().expect("mutex should be unpoisoned");
let nonce_reuse = locked_nonces.contains(&remote_nonce);
// Regardless of whether we observed nonce reuse, clean up the nonce set.
locked_nonces.remove(&local_nonce);
nonce_reuse
};
if nonce_reuse {
return Err(HandshakeError::NonceReuse);
}

timeout(constants::REQUEST_TIMEOUT, stream.send(Message::Verack)).await??;

let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next())
.await?
.ok_or(HandshakeError::ConnectionClosed)??;
if let Message::Verack = remote_msg {
debug!("got verack from remote peer");
} else {
return Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)));
}

// XXX in zcashd remote peer can only send one version message and
// we would disconnect here if it received a second one. Is it even possible
// for that to happen to us here?

// TODO: Reject incoming connections from nodes that don't know about the current epoch.
// zcashd does this:
// const Consensus::Params& consensusParams = chainparams.GetConsensus();
// auto currentEpoch = CurrentEpoch(GetHeight(), consensusParams);
// if (pfrom->nVersion < consensusParams.vUpgrades[currentEpoch].nProtocolVersion)
//
// For approximately 1.5 days before a network upgrade, zcashd also:
// - avoids old peers, and
// - prefers updated peers.
// We haven't decided if we need this behaviour in Zebra yet (see #706).
//
// At the network upgrade, we also need to disconnect from old peers (see #1334).
//
// TODO: replace min_for_upgrade(network, MIN_NETWORK_UPGRADE) with
// current_min(network, height) where network is the
// configured network, and height is the best tip's block
// height.

if remote_version < Version::min_for_upgrade(network, constants::MIN_NETWORK_UPGRADE) {
// Disconnect if peer is using an obsolete version.
return Err(HandshakeError::ObsoleteVersion(remote_version));
}
// Wrap the entire initial connection setup in a timeout.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let (remote_version, remote_services) = timeout(
constants::HANDSHAKE_TIMEOUT,
negotiate_version(
&mut peer_conn,
&addr,
config,
nonces,
user_agent,
our_services,
relay,
),
)
.await??;

// Set the connection's version to the minimum of the received version or our own.
let negotiated_version = std::cmp::min(remote_version, constants::CURRENT_VERSION);
Expand All @@ -348,7 +383,7 @@ where
// XXX The tokio documentation says not to do this while any frames are still being processed.
// Since we don't know that here, another way might be to release the tcp
// stream from the unversioned Framed wrapper and construct a new one with a versioned codec.
let bare_codec = stream.codec_mut();
let bare_codec = peer_conn.codec_mut();
bare_codec.reconfigure_version(negotiated_version);

debug!("constructing client, spawning server");
Expand All @@ -365,7 +400,7 @@ where
error_slot: slot.clone(),
};

let (peer_tx, peer_rx) = stream.split();
let (peer_tx, peer_rx) = peer_conn.split();

// Instrument the peer's rx and tx streams.

Expand All @@ -389,6 +424,7 @@ where
// Every message and error must update the peer address state via
// the inbound_ts_collector.
let inbound_ts_collector = timestamp_collector.clone();
let inv_collector = inv_collector.clone();
let peer_rx = peer_rx
.then(move |msg| {
// Add a metric for inbound messages and errors.
Expand Down Expand Up @@ -625,13 +661,7 @@ where

// Spawn a new task to drive this handshake.
tokio::spawn(fut.instrument(connector_span))
// This is required to get error types to line up.
// Probably there's a nicer way to express this using combinators.
.map(|x| match x {
Ok(Ok(client)) => Ok(client),
Ok(Err(handshake_err)) => Err(handshake_err.into()),
Err(join_err) => Err(join_err.into()),
})
.map(|x: Result<Result<Client, HandshakeError>, JoinError>| Ok(x??))
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
.boxed()
}
}