Skip to content

Commit

Permalink
Handshake followup (#138)
Browse files Browse the repository at this point in the history
Co-authored-by: João Oliveira <hello@jxs.pt>
  • Loading branch information
dknopik and jxs authored Feb 17, 2025
1 parent bf2fed5 commit 7378fc0
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 395 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion anchor/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ libp2p = { version = "0.54", default-features = false, features = [
"request-response",
] }
lighthouse_network = { workspace = true }
parking_lot = { workspace = true }
quick-protobuf = "0.8.1"
serde = { workspace = true }
serde_json = "1.0.137"
Expand All @@ -44,3 +43,4 @@ async-channel = { workspace = true }
libp2p-swarm = { version = "0.45.1", features = ["macros"] }
libp2p-swarm-test = { version = "0.4.0" }
tokio = { workspace = true, features = ["rt", "macros", "time"] }
tracing-subscriber = { workspace = true }
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::handshake::envelope;
use crate::handshake::envelope::{parse_envelope, Envelope};
use crate::handshake::envelope::Envelope;
use crate::handshake::node_info::NodeInfo;
use crate::handshake::{envelope, node_info};
use async_trait::async_trait;
use futures::{AsyncReadExt, AsyncWriteExt};
use libp2p::futures::{AsyncRead, AsyncWrite};
use libp2p::identity::Keypair;
use libp2p::{request_response, StreamProtocol};
use std::io;
use tracing::debug;
use tracing::trace;

const MAXIMUM_SIZE: u64 = 1024;

Expand All @@ -15,15 +17,53 @@ impl From<envelope::Error> for io::Error {
}
}

impl From<node_info::Error> for io::Error {
fn from(err: node_info::Error) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, err)
}
}

/// A `Codec` that reads/writes an **`Envelope`**.
#[derive(Clone, Debug, Default)]
pub struct Codec;
#[derive(Clone, Debug)]
pub struct Codec {
keypair: Keypair,
}

impl Codec {
pub fn new(keypair: Keypair) -> Self {
Self { keypair }
}

async fn read<T>(&mut self, io: &mut T) -> io::Result<NodeInfo>
where
T: AsyncRead + Unpin + Send,
{
let mut msg_buf = Vec::new();
let num_bytes_read = io.take(MAXIMUM_SIZE).read_to_end(&mut msg_buf).await?;
trace!(?num_bytes_read, "read handshake");
let env = Envelope::parse_and_verify(&msg_buf)?;
let node_info = NodeInfo::unmarshal(&env.payload)?;
Ok(node_info)
}

async fn write<T>(&mut self, io: &mut T, node_info: NodeInfo) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let envelope = node_info.seal(&self.keypair)?;
let raw = envelope.encode_to_vec()?;
io.write_all(&raw).await?;
io.flush().await?;
io.close().await?;
Ok(())
}
}

#[async_trait]
impl request_response::Codec for Codec {
type Protocol = StreamProtocol;
type Request = Envelope;
type Response = Envelope;
type Request = NodeInfo;
type Response = NodeInfo;

async fn read_request<T>(
&mut self,
Expand All @@ -33,14 +73,8 @@ impl request_response::Codec for Codec {
where
T: AsyncRead + Unpin + Send,
{
debug!("reading handsake request");
let mut msg_buf = Vec::new();
let num_bytes_read = io.take(MAXIMUM_SIZE).read_to_end(&mut msg_buf).await?;
// TODO potentially try to read one more byte here and create a "message too large error"
debug!(?num_bytes_read, "read handshake request");
let env = Envelope::decode_from_slice(&msg_buf)?;
debug!(?env, "decoded handshake request");
Ok(env)
trace!("reading handshake request");
self.read(io).await
}

async fn read_response<T>(
Expand All @@ -51,17 +85,8 @@ impl request_response::Codec for Codec {
where
T: AsyncRead + Unpin + Send,
{
debug!("reading handshake response");
let mut msg_buf = Vec::new();
// We don't need a varint here because we always read only one message in the protocol.
// In this way we can just read until the end of the stream.
let num_bytes_read = io.take(MAXIMUM_SIZE).read_to_end(&mut msg_buf).await?;
debug!(?num_bytes_read, "read handshake response");

let env = parse_envelope(&msg_buf)?;

debug!(?env, "decoded handshake response");
Ok(env)
trace!("reading handshake response");
self.read(io).await
}

async fn write_request<T>(
Expand All @@ -73,12 +98,8 @@ impl request_response::Codec for Codec {
where
T: AsyncWrite + Unpin + Send,
{
debug!(req = ?req, "writing handshake request");
let raw = req.encode_to_vec()?;
io.write_all(&raw).await?;
io.close().await?;
debug!("wrote handshake request");
Ok(())
trace!(req = ?req, "writing handshake request");
self.write(io, req).await
}

async fn write_response<T>(
Expand All @@ -90,13 +111,7 @@ impl request_response::Codec for Codec {
where
T: AsyncWrite + Unpin + Send,
{
debug!("writing handshake response");
let raw = res
.encode_to_vec()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
io.write_all(&raw).await?;
io.close().await?;
debug!("wrote handshake response");
Ok(())
trace!("writing handshake response");
self.write(io, res).await
}
}
1 change: 0 additions & 1 deletion anchor/network/src/handshake/envelope/generated/mod.rs

This file was deleted.

41 changes: 20 additions & 21 deletions anchor/network/src/handshake/envelope/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
mod codec;
mod generated;
mod generated {
pub mod message;
}

use crate::handshake::envelope::Error::SignatureVerification;
use crate::handshake::node_info::NodeInfo;
use discv5::libp2p_identity::PublicKey;
pub use generated::message::pb::Envelope;
use libp2p::identity::DecodingError;
use quick_protobuf::{BytesReader, Error as ProtoError, MessageRead, MessageWrite, Writer};
use thiserror::Error;
Expand All @@ -29,31 +32,31 @@ impl Envelope {
}

/// Decode an Envelope from a Protobuf byte array (like `proto.Unmarshal` in Go).
pub fn decode_from_slice(data: &[u8]) -> Result<Self, Error> {
fn decode_from_slice(data: &[u8]) -> Result<Self, Error> {
let mut reader = BytesReader::from_bytes(data);
let env = Envelope::from_reader(&mut reader, data).map_err(Error::Coding)?;
Ok(env)
}
}

/// Decodes an Envelope and verify signature.
pub fn parse_envelope(bytes: &[u8]) -> Result<Envelope, Error> {
let env = Envelope::decode_from_slice(bytes)?;
/// Decodes an Envelope and verify signature.
pub fn parse_and_verify(bytes: &[u8]) -> Result<Envelope, Error> {
let env = Envelope::decode_from_slice(bytes)?;

let domain = NodeInfo::DOMAIN;
let payload_type = NodeInfo::CODEC;
let domain = NodeInfo::DOMAIN;
let payload_type = NodeInfo::CODEC;

let unsigned = make_unsigned(domain.as_bytes(), payload_type, &env.payload);
let unsigned = make_unsigned(domain.as_bytes(), payload_type, &env.payload);

let pk = PublicKey::try_decode_protobuf(&env.public_key.to_vec())?;
let pk = PublicKey::try_decode_protobuf(&env.public_key.to_vec())?;

if !pk.verify(&unsigned?, &env.signature) {
return Err(SignatureVerification(
"signature verification failed".into(),
));
}
if !pk.verify(&unsigned?, &env.signature) {
return Err(SignatureVerification(
"signature verification failed".into(),
));
}

Ok(env)
Ok(env)
}
}

pub fn make_unsigned(
Expand All @@ -70,7 +73,3 @@ pub fn make_unsigned(
}
Ok(buf)
}

use crate::handshake::envelope::Error::SignatureVerification;
pub use codec::Codec;
pub use generated::message::pb::Envelope;
Loading

0 comments on commit 7378fc0

Please sign in to comment.