Skip to content

Commit

Permalink
Added timeout to hole punch
Browse files Browse the repository at this point in the history
  • Loading branch information
manforowicz committed Mar 31, 2024
1 parent 5d14fc9 commit 1654be1
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 52 deletions.
30 changes: 29 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,39 @@
This might be difficult because various inner functions require a get address function.
Maybe I can create a trait that allows for such a function call, and implement this trait
for both raw TCP and TLS? That sounds overly complicated, but maybe it's the only option?
Or potentially just pass an address parameter everywhere??

- Can the hole puncher even be used standalone from contact exchange? Yeah, I suppose
if someone is trying to reconnect or something.

- Make peer authentication not "block" hole punching.

- Potentially keep connection to server open during hole punching?


- Improve error message everywhere in general. Have helpful tips to the user.
- Add checks in the file transfer to avoid TOCTOU bugs.
- Add checks in the file transfer to avoid TOCTOU bugs.
- Change the progress bar to use indicatif's built-in wrap write.

# Hole punching idea

Ok, here's my genius new idea:

the `get_contact(..., time_limit)` function will try and:

- If a local <-> local connection is authenticated, return early.
- Otherwise, keep trying until the `time_limit` is reached.
- If success, return that TCP connection and corresponding secret key.
- Otherwise, return a struct that gives detailed information about the attempts.

## The error struct

This struct will have a field for each attempted connection (v4 private and public, v6 private and public),
as well as a field for the v4 listener and the v6 listener.

Each of these will store an enum of:
- Didn't try connecting from here because no v4 or v6 was used.
- IO Error
- Established TCP connection, but didn't receive any messages on it.
- Established TCP connection, but received invalid messages.
- Established TCP connection, but the peer's shared secret was incorrect.
4 changes: 4 additions & 0 deletions gday/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use gday_file_offer_protocol::{to_writer, FileMeta, FileOfferMsg};

const TMP_DOWNLOAD_PREFIX: &str = "GDAY_PARTIAL_DOWNLOAD_";

const HOLE_PUNCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

/// Send files directly to peers
#[derive(Parser, Debug)]
#[command(author, version, about)]
Expand Down Expand Up @@ -131,6 +133,7 @@ fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
my_contact.private,
peer_contact,
&shared_secret.to_be_bytes(),
HOLE_PUNCH_TIMEOUT,
)?;

let mut stream = encrypt_connection(stream, &shared_key, true)?;
Expand Down Expand Up @@ -173,6 +176,7 @@ fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
my_contact.private,
peer_contact,
&code.shared_secret.to_be_bytes(),
HOLE_PUNCH_TIMEOUT,
)?;

let mut stream = encrypt_connection(stream, &shared_key, false)?;
Expand Down
3 changes: 1 addition & 2 deletions gday_contact_exchange_protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use std::{
io::{Read, Write},
net::{SocketAddr, SocketAddrV4, SocketAddrV6},
};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

/// A message from client to server.
Expand Down Expand Up @@ -223,7 +222,7 @@ pub async fn deserialize_from_async<T: DeserializeOwned>(
}

/// Message serialization/deserialization error
#[derive(Error, Debug)]
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
pub enum Error {
#[error("JSON error encoding/decoding message: {0}")]
Expand Down
1 change: 0 additions & 1 deletion gday_hole_punch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ license = "MIT"

[dependencies]
blake3 = "1.5.1"
futures = "0.3.30"
gday_contact_exchange_protocol = { version = "0.1.0", path = "../gday_contact_exchange_protocol" }
log = "0.4.21"
rand = "0.8.5"
Expand Down
186 changes: 138 additions & 48 deletions gday_hole_punch/src/hole_puncher.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::Error;
use gday_contact_exchange_protocol::{Contact, FullContact};
use log::{debug, trace};
use socket2::{SockRef, TcpKeepalive};
use spake2::{Ed25519Group, Identity, Password, Spake2};
use std::{future::Future, net::SocketAddr, pin::Pin, time::Duration};
use std::{net::SocketAddr, time::Duration};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpSocket,
Expand Down Expand Up @@ -33,78 +34,159 @@ pub fn try_connect_to_peer(
local_contact: Contact,
peer_contact: FullContact,
shared_secret: &[u8],
) -> std::io::Result<PeerConnection> {
timeout: std::time::Duration,
) -> Result<PeerConnection, HolePunchErrors> {
// time at which to give up hole punching
let end_time = tokio::time::Instant::now() + timeout;

// shorten the variable name for conciseness
let p = shared_secret;
let mut futs: Vec<Pin<Box<dyn Future<Output = std::io::Result<PeerConnection>>>>> =
Vec::with_capacity(6);

if let Some(local) = local_contact.v4 {
futs.push(Box::pin(try_accept(local, p)));
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.expect("Tokio async runtime error.");

if let Some(peer) = peer_contact.private.v4 {
futs.push(Box::pin(try_connect(local, peer, p)));
}
// hole punch asynchronously
runtime.block_on(async {
let mut futs = tokio::task::JoinSet::new();
if let Some(local) = local_contact.v4 {
futs.spawn(try_accept(local, p.to_vec(), end_time));

if let Some(peer) = peer_contact.public.v4 {
futs.push(Box::pin(try_connect(local, peer, p)));
if let Some(peer) = peer_contact.private.v4 {
futs.spawn(try_connect(local, peer, p.to_vec(), end_time));
}

if let Some(peer) = peer_contact.public.v4 {
futs.spawn(try_connect(local, peer, p.to_vec(), end_time));
}
}
}

if let Some(local) = local_contact.v6 {
futs.push(Box::pin(try_accept(local, p)));
if let Some(local) = local_contact.v6 {
futs.spawn(try_accept(local, p.to_vec(), end_time));

if let Some(peer) = peer_contact.private.v6 {
futs.push(Box::pin(try_connect(local, peer, p)));
if let Some(peer) = peer_contact.private.v6 {
futs.spawn(try_connect(local, peer, p.to_vec(), end_time));
}
if let Some(peer) = peer_contact.public.v6 {
futs.spawn(try_connect(local, peer, p.to_vec(), end_time));
}
}
if let Some(peer) = peer_contact.public.v6 {
futs.push(Box::pin(try_connect(local, peer, p)));
let mut errors = Vec::new();
trace!("Starting hole-punching");
while let Some(outcome) = futs.join_next().await {
match outcome.expect("Async error") {
Ok(connection) => return Ok(connection),
Err(err) => errors.push(err),
}
}
}

let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()?;

Ok(runtime.block_on(futures::future::select_ok(futs))?.0)
Err(HolePunchErrors {errors})
})
}

/// Tries to connect to a socket address.
/// Tries to TCP connect to `peer` from `local`.
/// Returns the most recent error if not successful by `end_time`.
async fn try_connect<T: Into<SocketAddr>>(
local: T,
peer: T,
shared_secret: &[u8],
) -> std::io::Result<PeerConnection> {
shared_secret: Vec<u8>,
end_time: tokio::time::Instant,
) -> Result<PeerConnection, Error> {
let local = local.into();
let peer = peer.into();
loop {
tokio::time::sleep(RETRY_INTERVAL).await;
let local_socket = get_local_socket(local)?;
let Ok(stream) = local_socket.connect(peer).await else {
continue;
};
if let Ok(connection) = verify_peer(shared_secret, stream).await {
return Ok(connection);
let mut last_error = Error::HolePunchTimeout;
let mut interval = tokio::time::interval(RETRY_INTERVAL);

trace!("Trying to connect from {local} to {peer}.");

while tokio::time::Instant::now() < end_time {
// try connecting
match tokio::time::timeout_at(end_time, try_connect_once(local, peer, &shared_secret)).await
{
// return successfully connection
Ok(Ok(connection)) => return Ok(connection),
// update `last_error`
Ok(Err(err)) => {
debug!("Error when trying to connect from {local} to {peer}: {err}");
last_error = err;
}
// passed `end_time`
Err(..) => break,
}

// wait some time to avoid flooding the network
match tokio::time::timeout_at(end_time, interval.tick()).await {
// done waiting
Ok(..) => (),
// passed `end_time`
Err(..) => break,
};
}
Err(last_error)
}

/// Tries to accept a connection from a socket address.
/// Tries to accept a peer TCP connection on `local`.
/// Returns the most recent error if not successful by `end_time`.
async fn try_accept(
local: impl Into<SocketAddr>,
shared_secret: &[u8],
) -> std::io::Result<PeerConnection> {
let local_socket = get_local_socket(local.into())?;
shared_secret: Vec<u8>,
end_time: tokio::time::Instant,
) -> Result<PeerConnection, Error> {
let local = local.into();
trace!("Trying accept peer's connection on {local}.");
let local_socket = get_local_socket(local)?;
let listener = local_socket.listen(1024)?;
loop {
tokio::time::sleep(RETRY_INTERVAL).await;
let Ok((stream, _addr)) = listener.accept().await else {
continue;
};
if let Ok(connection) = verify_peer(shared_secret, stream).await {
return Ok(connection);
let mut last_error = Error::HolePunchTimeout;
let mut interval = tokio::time::interval(RETRY_INTERVAL);

while tokio::time::Instant::now() < end_time {
// try accepting
match tokio::time::timeout_at(end_time, try_accept_once(&listener, &shared_secret)).await {
// return successful connection
Ok(Ok(connection)) => return Ok(connection),
// update `last_error`
Ok(Err(err)) => {
debug!("Error when trying to accept peer's connection on {local}: {err}");
last_error = err;
}
// passed `end_time`
Err(..) => break,
}

// wait some time to avoid flooding the network
match tokio::time::timeout_at(end_time, interval.tick()).await {
// done waiting
Ok(..) => (),
// passed `end_time`
Err(..) => break,
};
}
Err(last_error)
}

async fn try_connect_once(
local: SocketAddr,
peer: SocketAddr,
shared_secret: &[u8],
) -> Result<PeerConnection, Error> {
let local_socket = get_local_socket(local)?;
let stream = local_socket.connect(peer).await?;
debug!("Connected to {peer} from {local}. Will try to authenticate.");
verify_peer(shared_secret, stream).await
}

async fn try_accept_once(
listener: &tokio::net::TcpListener,
shared_secret: &[u8],
) -> Result<PeerConnection, Error> {
let (stream, addr) = listener.accept().await?;
debug!(
"Connected from {} to {}. Will try to authenticate.",
addr,
stream.local_addr()?
);
verify_peer(shared_secret, stream).await
}

/// Uses [SPAKE 2](https://docs.rs/spake2/latest/spake2/)
Expand Down Expand Up @@ -133,6 +215,8 @@ async fn verify_peer(
.try_into()
.expect("Unreachable: Key is always 32 bytes long.");

debug!("Derived a strong key with the peer. Will now verify we both have the same key.");

//// Mutually verify that we have the same `shared_key` ////

// send a random challenge to the peer
Expand Down Expand Up @@ -195,3 +279,9 @@ fn get_local_socket(local_addr: SocketAddr) -> std::io::Result<TcpSocket> {
socket.bind(local_addr)?;
Ok(socket)
}

#[derive(thiserror::Error, Debug)]
#[error("Hole punching unsuccessful: {:#?}", errors)]
pub struct HolePunchErrors {
errors: Vec<crate::Error>
}
3 changes: 3 additions & 0 deletions gday_hole_punch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ pub enum Error {

#[error("Invalid DNS name: {0}")]
InvalidDNSName(#[from] rustls::pki_types::InvalidDnsNameError),

#[error("Couldn't hole-punch peer-to-peer connection in under the timeout.")]
HolePunchTimeout,
}

0 comments on commit 1654be1

Please sign in to comment.