-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make sure handshake version negotiation always has a timeout #2008
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,7 @@ use futures::{ | |
channel::{mpsc, oneshot}, | ||
prelude::*, | ||
}; | ||
use tokio::{net::TcpStream, sync::broadcast, time::timeout}; | ||
use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout}; | ||
use tokio_util::codec::Framed; | ||
use tower::Service; | ||
use tracing::{span, Level, Span}; | ||
|
@@ -180,6 +180,138 @@ where | |
} | ||
} | ||
|
||
/// Negotiate the Zcash network protocol version with the remote peer | ||
/// at `addr`, using the connection `peer_conn`. | ||
/// | ||
/// We split `Handshake` into its components before calling this function, | ||
/// to avoid infectious `Sync` bounds on the returned future. | ||
pub async fn negotiate_version( | ||
peer_conn: &mut Framed<TcpStream, Codec>, | ||
addr: &SocketAddr, | ||
config: Config, | ||
nonces: Arc<Mutex<HashSet<Nonce>>>, | ||
user_agent: String, | ||
our_services: PeerServices, | ||
relay: bool, | ||
) -> Result<(Version, PeerServices), HandshakeError> { | ||
// Create a random nonce for this connection | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
let local_nonce = Nonce::default(); | ||
nonces | ||
.lock() | ||
.expect("mutex should be unpoisoned") | ||
.insert(local_nonce); | ||
|
||
// Don't leak our exact clock skew to our peers. On the other hand, | ||
// we can't deviate too much, or zcashd will get confused. | ||
// Inspection of the zcashd source code reveals that the timestamp | ||
// is only ever used at the end of parsing the version message, in | ||
// | ||
// pfrom->nTimeOffset = timeWarning.AddTimeData(pfrom->addr, nTime, GetTime()); | ||
// | ||
// AddTimeData is defined in src/timedata.cpp and is a no-op as long | ||
// as the difference between the specified timestamp and the | ||
// zcashd's local time is less than TIMEDATA_WARNING_THRESHOLD, set | ||
// to 10 * 60 seconds (10 minutes). | ||
// | ||
// nTimeOffset is peer metadata that is never used, except for | ||
// statistics. | ||
// | ||
// To try to stay within the range where zcashd will ignore our clock skew, | ||
// truncate the timestamp to the nearest 5 minutes. | ||
let now = Utc::now().timestamp(); | ||
let timestamp = Utc.timestamp(now - now.rem_euclid(5 * 60), 0); | ||
|
||
let our_version = Message::Version { | ||
version: constants::CURRENT_VERSION, | ||
services: our_services, | ||
timestamp, | ||
address_recv: (PeerServices::NODE_NETWORK, *addr), | ||
// TODO: detect external address (#1893) | ||
address_from: (our_services, config.listen_addr), | ||
nonce: local_nonce, | ||
user_agent: user_agent.clone(), | ||
// The protocol works fine if we don't reveal our current block height, | ||
// and not sending it means we don't need to be connected to the chain state. | ||
start_height: block::Height(0), | ||
Comment on lines
+233
to
+235
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🥰 |
||
relay, | ||
}; | ||
|
||
debug!(?our_version, "sending initial version message"); | ||
peer_conn.send(our_version).await?; | ||
|
||
let remote_msg = peer_conn | ||
.next() | ||
.await | ||
.ok_or(HandshakeError::ConnectionClosed)??; | ||
|
||
// Check that we got a Version and destructure its fields into the local scope. | ||
debug!(?remote_msg, "got message from remote peer"); | ||
let (remote_nonce, remote_services, remote_version) = if let Message::Version { | ||
nonce, | ||
services, | ||
version, | ||
.. | ||
} = remote_msg | ||
{ | ||
(nonce, services, version) | ||
} else { | ||
Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)))? | ||
}; | ||
|
||
// Check for nonce reuse, indicating self-connection. | ||
let nonce_reuse = { | ||
let mut locked_nonces = nonces.lock().expect("mutex should be unpoisoned"); | ||
let nonce_reuse = locked_nonces.contains(&remote_nonce); | ||
// Regardless of whether we observed nonce reuse, clean up the nonce set. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
locked_nonces.remove(&local_nonce); | ||
nonce_reuse | ||
}; | ||
if nonce_reuse { | ||
Err(HandshakeError::NonceReuse)?; | ||
} | ||
|
||
peer_conn.send(Message::Verack).await?; | ||
|
||
let remote_msg = peer_conn | ||
.next() | ||
.await | ||
.ok_or(HandshakeError::ConnectionClosed)??; | ||
if let Message::Verack = remote_msg { | ||
debug!("got verack from remote peer"); | ||
} else { | ||
Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)))?; | ||
} | ||
|
||
// XXX in zcashd remote peer can only send one version message and | ||
// we would disconnect here if it received a second one. Is it even possible | ||
// for that to happen to us here? | ||
|
||
// TODO: Reject incoming connections from nodes that don't know about the current epoch. | ||
// zcashd does this: | ||
// const Consensus::Params& consensusParams = chainparams.GetConsensus(); | ||
// auto currentEpoch = CurrentEpoch(GetHeight(), consensusParams); | ||
// if (pfrom->nVersion < consensusParams.vUpgrades[currentEpoch].nProtocolVersion) | ||
// | ||
// For approximately 1.5 days before a network upgrade, zcashd also: | ||
// - avoids old peers, and | ||
// - prefers updated peers. | ||
// We haven't decided if we need this behaviour in Zebra yet (see #706). | ||
// | ||
// At the network upgrade, we also need to disconnect from old peers (see #1334). | ||
// | ||
// TODO: replace min_for_upgrade(network, MIN_NETWORK_UPGRADE) with | ||
// current_min(network, height) where network is the | ||
// configured network, and height is the best tip's block | ||
// height. | ||
|
||
if remote_version < Version::min_for_upgrade(config.network, constants::MIN_NETWORK_UPGRADE) { | ||
// Disconnect if peer is using an obsolete version. | ||
Err(HandshakeError::ObsoleteVersion(remote_version))?; | ||
} | ||
|
||
Ok((remote_version, remote_services)) | ||
} | ||
|
||
impl<S> Service<(TcpStream, SocketAddr)> for Handshake<S> | ||
where | ||
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static, | ||
|
@@ -197,148 +329,51 @@ where | |
fn call(&mut self, req: (TcpStream, SocketAddr)) -> Self::Future { | ||
let (tcp_stream, addr) = req; | ||
|
||
let connector_span = span!(Level::INFO, "connector", addr = ?addr); | ||
let connector_span = span!(Level::INFO, "connector", ?addr); | ||
// set the peer connection span's parent to the global span, as it | ||
// should exist independently of its creation source (inbound | ||
// connection, crawler, initial peer, ...) | ||
let connection_span = span!(parent: &self.parent_span, Level::INFO, "peer", addr = ?addr); | ||
let connection_span = span!(parent: &self.parent_span, Level::INFO, "peer", ?addr); | ||
|
||
// Clone these upfront, so they can be moved into the future. | ||
let nonces = self.nonces.clone(); | ||
let inbound_service = self.inbound_service.clone(); | ||
let timestamp_collector = self.timestamp_collector.clone(); | ||
let inv_collector = self.inv_collector.clone(); | ||
let network = self.config.network; | ||
let our_addr = self.config.listen_addr; | ||
let config = self.config.clone(); | ||
let user_agent = self.user_agent.clone(); | ||
let our_services = self.our_services; | ||
let relay = self.relay; | ||
|
||
let fut = async move { | ||
debug!("connecting to remote peer"); | ||
debug!(?addr, "negotiating protocol version with remote peer"); | ||
|
||
// CORRECTNESS | ||
// | ||
// As a defence-in-depth against hangs, every send or next on stream | ||
// should be wrapped in a timeout. | ||
let mut stream = Framed::new( | ||
let mut peer_conn = Framed::new( | ||
tcp_stream, | ||
Codec::builder() | ||
.for_network(network) | ||
.for_network(config.network) | ||
.with_metrics_label(addr.ip().to_string()) | ||
.finish(), | ||
); | ||
|
||
let local_nonce = Nonce::default(); | ||
nonces | ||
.lock() | ||
.expect("mutex should be unpoisoned") | ||
.insert(local_nonce); | ||
|
||
// Don't leak our exact clock skew to our peers. On the other hand, | ||
// we can't deviate too much, or zcashd will get confused. | ||
// Inspection of the zcashd source code reveals that the timestamp | ||
// is only ever used at the end of parsing the version message, in | ||
// | ||
// pfrom->nTimeOffset = timeWarning.AddTimeData(pfrom->addr, nTime, GetTime()); | ||
// | ||
// AddTimeData is defined in src/timedata.cpp and is a no-op as long | ||
// as the difference between the specified timestamp and the | ||
// zcashd's local time is less than TIMEDATA_WARNING_THRESHOLD, set | ||
// to 10 * 60 seconds (10 minutes). | ||
// | ||
// nTimeOffset is peer metadata that is never used, except for | ||
// statistics. | ||
// | ||
// To try to stay within the range where zcashd will ignore our clock skew, | ||
// truncate the timestamp to the nearest 5 minutes. | ||
let now = Utc::now().timestamp(); | ||
let timestamp = Utc.timestamp(now - now.rem_euclid(5 * 60), 0); | ||
|
||
let version = Message::Version { | ||
version: constants::CURRENT_VERSION, | ||
services: our_services, | ||
timestamp, | ||
address_recv: (PeerServices::NODE_NETWORK, addr), | ||
address_from: (our_services, our_addr), | ||
nonce: local_nonce, | ||
user_agent, | ||
// The protocol works fine if we don't reveal our current block height, | ||
// and not sending it means we don't need to be connected to the chain state. | ||
start_height: block::Height(0), | ||
relay, | ||
}; | ||
|
||
debug!(?version, "sending initial version message"); | ||
timeout(constants::REQUEST_TIMEOUT, stream.send(version)).await??; | ||
|
||
let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next()) | ||
.await? | ||
.ok_or(HandshakeError::ConnectionClosed)??; | ||
|
||
// Check that we got a Version and destructure its fields into the local scope. | ||
debug!(?remote_msg, "got message from remote peer"); | ||
let (remote_nonce, remote_services, remote_version) = if let Message::Version { | ||
nonce, | ||
services, | ||
version, | ||
.. | ||
} = remote_msg | ||
{ | ||
(nonce, services, version) | ||
} else { | ||
return Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg))); | ||
}; | ||
|
||
// Check for nonce reuse, indicating self-connection. | ||
let nonce_reuse = { | ||
let mut locked_nonces = nonces.lock().expect("mutex should be unpoisoned"); | ||
let nonce_reuse = locked_nonces.contains(&remote_nonce); | ||
// Regardless of whether we observed nonce reuse, clean up the nonce set. | ||
locked_nonces.remove(&local_nonce); | ||
nonce_reuse | ||
}; | ||
if nonce_reuse { | ||
return Err(HandshakeError::NonceReuse); | ||
} | ||
|
||
timeout(constants::REQUEST_TIMEOUT, stream.send(Message::Verack)).await??; | ||
|
||
let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next()) | ||
.await? | ||
.ok_or(HandshakeError::ConnectionClosed)??; | ||
if let Message::Verack = remote_msg { | ||
debug!("got verack from remote peer"); | ||
} else { | ||
return Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg))); | ||
} | ||
|
||
// XXX in zcashd remote peer can only send one version message and | ||
// we would disconnect here if it received a second one. Is it even possible | ||
// for that to happen to us here? | ||
|
||
// TODO: Reject incoming connections from nodes that don't know about the current epoch. | ||
// zcashd does this: | ||
// const Consensus::Params& consensusParams = chainparams.GetConsensus(); | ||
// auto currentEpoch = CurrentEpoch(GetHeight(), consensusParams); | ||
// if (pfrom->nVersion < consensusParams.vUpgrades[currentEpoch].nProtocolVersion) | ||
// | ||
// For approximately 1.5 days before a network upgrade, zcashd also: | ||
// - avoids old peers, and | ||
// - prefers updated peers. | ||
// We haven't decided if we need this behaviour in Zebra yet (see #706). | ||
// | ||
// At the network upgrade, we also need to disconnect from old peers (see #1334). | ||
// | ||
// TODO: replace min_for_upgrade(network, MIN_NETWORK_UPGRADE) with | ||
// current_min(network, height) where network is the | ||
// configured network, and height is the best tip's block | ||
// height. | ||
|
||
if remote_version < Version::min_for_upgrade(network, constants::MIN_NETWORK_UPGRADE) { | ||
// Disconnect if peer is using an obsolete version. | ||
return Err(HandshakeError::ObsoleteVersion(remote_version)); | ||
} | ||
// Wrap the entire initial connection setup in a timeout. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
let (remote_version, remote_services) = timeout( | ||
constants::HANDSHAKE_TIMEOUT, | ||
negotiate_version( | ||
&mut peer_conn, | ||
&addr, | ||
config, | ||
nonces, | ||
user_agent, | ||
our_services, | ||
relay, | ||
), | ||
) | ||
.await??; | ||
|
||
// Set the connection's version to the minimum of the received version or our own. | ||
let negotiated_version = std::cmp::min(remote_version, constants::CURRENT_VERSION); | ||
|
@@ -348,7 +383,7 @@ where | |
// XXX The tokio documentation says not to do this while any frames are still being processed. | ||
// Since we don't know that here, another way might be to release the tcp | ||
// stream from the unversioned Framed wrapper and construct a new one with a versioned codec. | ||
let bare_codec = stream.codec_mut(); | ||
let bare_codec = peer_conn.codec_mut(); | ||
bare_codec.reconfigure_version(negotiated_version); | ||
|
||
debug!("constructing client, spawning server"); | ||
|
@@ -365,7 +400,7 @@ where | |
error_slot: slot.clone(), | ||
}; | ||
|
||
let (peer_tx, peer_rx) = stream.split(); | ||
let (peer_tx, peer_rx) = peer_conn.split(); | ||
|
||
// Instrument the peer's rx and tx streams. | ||
|
||
|
@@ -389,6 +424,7 @@ where | |
// Every message and error must update the peer address state via | ||
// the inbound_ts_collector. | ||
let inbound_ts_collector = timestamp_collector.clone(); | ||
let inv_collector = inv_collector.clone(); | ||
let peer_rx = peer_rx | ||
.then(move |msg| { | ||
// Add a metric for inbound messages and errors. | ||
|
@@ -625,13 +661,7 @@ where | |
|
||
// Spawn a new task to drive this handshake. | ||
tokio::spawn(fut.instrument(connector_span)) | ||
// This is required to get error types to line up. | ||
// Probably there's a nicer way to express this using combinators. | ||
.map(|x| match x { | ||
Ok(Ok(client)) => Ok(client), | ||
Ok(Err(handshake_err)) => Err(handshake_err.into()), | ||
Err(join_err) => Err(join_err.into()), | ||
}) | ||
.map(|x: Result<Result<Client, HandshakeError>, JoinError>| Ok(x??)) | ||
teor2345 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.boxed() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'infectious' is a good way to put this 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried it, and they just got everywhere