Skip to content

Commit

Permalink
Changed to use progress write wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
manforowicz committed Mar 31, 2024
1 parent 1654be1 commit 2fcd76c
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 86 deletions.
47 changes: 17 additions & 30 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,43 +1,30 @@
- Have the hole puncher time out and return a helpful error after a given amount of time.
- Have the hole puncher prefer local sockets over public sockets.
# To-Do's
Items to consider implementing. Not all of them are desirable or necessary.
These are just some quick notes that might not make sense.

- Have the hole puncher actively prefer local sockets over public sockets.
But I don't think this matters much since
most NATs don't support hairpin translation, and if they do, I doubt its much slower than a direct connection.

- Deduplicate errors in the error vec the hole puncher can return.
This might give a more helpful error message to the user.

- Give the client and server the option to use plain TCP instead of TLS.
This might be difficult because various inner functions require a get address function.
Maybe I can create a trait that allows for such a function call, and implement this trait
for both raw TCP and TLS? That sounds overly complicated, but maybe it's the only option?
Or potentially just pass an address parameter everywhere??

- Can the hole puncher even be used standalone from contact exchange? Yeah, I suppose
if someone is trying to reconnect or something.
- Restructure the hole puncher to force keeping connection to server open
during hole-punching. That might please some NATs that lose state when TCP connection is closed.

- Make peer authentication not "block" hole punching.

- Potentially keep connection to server open during hole punching?

"Blocking" might be an issue when the peer is receiving other
incoming connections. But this probably won't happen unless
the peer's device is acting as some sort of server.

- Improve error message everywhere in general. Have helpful tips to the user.
- Add checks in the file transfer to avoid TOCTOU bugs.
- Change the progress bar to use indicatif's built-in wrap write.

# Hole punching idea

Ok, here's my genius new idea:

the `get_contact(..., time_limit)` function will try and:

- If a local <-> local connection is authenticated, return early.
- Otherwise, keep trying until the `time_limit` is reached.
- If success, return that TCP connection and corresponding secret key.
- Otherwise, return a struct that gives detailed information about the attempts.

## The error struct

This struct will have a field for each attempted connection (v4 private and public, v6 private and public),
as well as a field for the v4 listener and the v6 listener.
- Add checks in the file transfer to avoid TOCTOU bugs.

Each of these will store an enum of:
- Didn't try connecting from here because no v4 or v6 was used.
- IO Error
- Established TCP connection, but didn't receive any messages on it.
- Established TCP connection, but received invalid messages.
- Established TCP connection, but the peer's shared secret was incorrect.
- Change the progress bar to use indicatif's built-in wrap write.
27 changes: 17 additions & 10 deletions gday/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fn main() {
fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
// use custom server if the user provided one,
// otherwise pick a random default server
let (server_connection, server_id) = if let Some(domain_name) = args.server {
let (mut server_connection, server_id) = if let Some(domain_name) = args.server {
(server_connector::connect_to_domain_name(&domain_name)?, 0)
} else {
server_connector::connect_to_random_server(DEFAULT_SERVERS)?
Expand Down Expand Up @@ -114,7 +114,7 @@ fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {

// create a room in the server
let (contact_sharer, my_contact) =
ContactSharer::create_room(room_code, server_connection)?;
ContactSharer::create_room(room_code, &mut server_connection)?;

info!("Your contact is:\n{my_contact}");

Expand Down Expand Up @@ -149,21 +149,26 @@ fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
// offer these files to the peer
to_writer(FileOfferMsg { files }, &mut stream)?;

info!("Waiting for peer to respond to file offer.");
println!("Waiting for your mate to respond to your file offer.");

// receive file offer from peer
let response: FileResponseMsg = from_reader(&mut stream)?;

info!("Starting file send.");

transfer::send_files(&mut stream, &local_files, &response.accepted)?;
if response.accepted.iter().all(|x| x.is_none()) {
println!("Your mate rejected all your offered files.");
} else {
if !response.accepted.iter().filter_map(|x| *x).all(|x| x == 0) {
println!("Resuming transfer of interrupted file(s).");
}
transfer::send_files(&mut stream, &local_files, &response.accepted)?;
}
}

// receiving files
Command::Receive { code } => {
let code = PeerCode::from_str(&code)?;
let (contact_sharer, my_contact) =
ContactSharer::join_room(code.room_code, server_connection)?;
ContactSharer::join_room(code.room_code, &mut server_connection)?;

info!("Your contact is:\n{my_contact}");

Expand Down Expand Up @@ -197,9 +202,11 @@ fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
&mut stream,
)?;

info!("Starting file download.");

transfer::receive_files(&mut stream, &offer.files, &accepted)?;
if accepted.iter().all(|x| x.is_none()) {
println!("No files will be downloaded.");
} else {
transfer::receive_files(&mut stream, &offer.files, &accepted)?;
}
}
}

Expand Down
50 changes: 10 additions & 40 deletions gday/src/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use gday_encryption::EncryptedStream;
use gday_file_offer_protocol::{FileMeta, FileMetaLocal};
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::io::{BufRead, BufReader, BufWriter, Read, Seek, SeekFrom, Write};

const FILE_BUFFER_SIZE: usize = 1_000_000;

Expand Down Expand Up @@ -51,6 +51,9 @@ pub fn send_files(
// create a progress bar object
let progress = create_progress_bar(size);

// wrap the writer in progress bar
let mut writer = progress.wrap_write(writer);

// iterate over all the files
for (meta, &accepted) in offered.iter().zip(accepted) {
if let Some(start) = accepted {
Expand All @@ -64,25 +67,18 @@ pub fn send_files(
// TODO: maybe check if file length is correct?

file.seek(SeekFrom::Start(start))?;
let mut writer = ProgressWrite {
writer,
progress: &progress,
};
std::io::copy(&mut file, &mut writer)?;
}
}

// flush the writer
writer.flush()?;

progress.finish_with_message("Done sending!");

Ok(())
}

/// Sequentially save the given `files` from this `reader`.
pub fn receive_files(
reader: &mut impl Read,
reader: &mut impl BufRead,
offered: &[FileMeta],
accepted: &[Option<u64>],
) -> Result<(), gday_file_offer_protocol::Error> {
Expand Down Expand Up @@ -125,11 +121,8 @@ pub fn receive_files(
// only take the length of the file from the reader
let mut reader = reader.take(meta.len);

// wrap the file to track write progress
let mut writer = ProgressWrite {
writer: &mut file,
progress: &progress,
};
// wrap the writer in progress bar
let mut writer = progress.wrap_write(&mut file);

// copy from the reader into the file
std::io::copy(&mut reader, &mut writer)?;
Expand All @@ -148,16 +141,13 @@ pub fn receive_files(
// open the partially downloaded file in append mode
let tmp_path = meta.get_prefixed_save_path(TMP_DOWNLOAD_PREFIX.into())?;
let file = OpenOptions::new().append(true).open(&tmp_path)?;
let mut file = BufWriter::with_capacity(FILE_BUFFER_SIZE, file);
let file = BufWriter::with_capacity(FILE_BUFFER_SIZE, file);

// only take the length of the remaining part of the file from the reader
let mut reader = reader.take(meta.len - start);

// wrap the file to track write progress
let mut writer = ProgressWrite {
writer: &mut file,
progress: &progress,
};
// wrap the writer in progress bar
let mut writer = progress.wrap_write(file);

// copy from the reader into the file
std::io::copy(&mut reader, &mut writer)?;
Expand All @@ -184,23 +174,3 @@ fn create_progress_bar(bytes: u64) -> ProgressBar {
.with_style(style)
.with_message("starting...")
}

/// A thin wrapper around a [`Write`] IO stream and [`ProgressBar`].
/// Increments the [`ProgressBar`] by the number of bytes written.
struct ProgressWrite<'a, T: Write> {
writer: &'a mut T,
progress: &'a ProgressBar,
}

impl<'a, T: Write> Write for ProgressWrite<'a, T> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let bytes_written = self.writer.write(buf)?;

self.progress.inc(bytes_written as u64);
Ok(bytes_written)
}

fn flush(&mut self) -> std::io::Result<()> {
self.writer.flush()
}
}
12 changes: 6 additions & 6 deletions gday_hole_punch/src/contact_sharer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use crate::{server_connector::ServerConnection, Error};
use gday_contact_exchange_protocol::{from_reader, to_writer, ClientMsg, FullContact, ServerMsg};

/// Used to exchange socket addresses with a peer via a Gday server.
pub struct ContactSharer {
pub struct ContactSharer<'a> {
room_code: u64,
is_creator: bool,
connection: ServerConnection,
connection: &'a mut ServerConnection,
}

impl ContactSharer {
impl<'a> ContactSharer<'a> {
/// Creates a new room with `room_code` in the Gday server
/// that `server_connection` connects to.
///
Expand All @@ -20,7 +20,7 @@ impl ContactSharer {
/// determined by the server
pub fn create_room(
room_code: u64,
mut server_connection: ServerConnection,
server_connection: &'a mut ServerConnection,
) -> Result<(Self, FullContact), Error> {
server_connection.configure()?;

Expand Down Expand Up @@ -55,7 +55,7 @@ impl ContactSharer {
/// determined by the server
pub fn join_room(
room_code: u64,
server_connection: ServerConnection,
server_connection: &'a mut ServerConnection,
) -> Result<(Self, FullContact), Error> {
server_connection.configure()?;

Expand Down Expand Up @@ -109,7 +109,7 @@ impl ContactSharer {
/// Blocks until the Gday server sends the contact information the
/// other peer submitted. Returns the peer's [`FullContact`], as
/// determined by the server
pub fn get_peer_contact(mut self) -> Result<FullContact, Error> {
pub fn get_peer_contact(self) -> Result<FullContact, Error> {
let stream = &mut self.connection.streams()[0];
let reply: ServerMsg = from_reader(stream)?;
let ServerMsg::PeerContact(peer) = reply else {
Expand Down
2 changes: 2 additions & 0 deletions gday_hole_punch/src/hole_puncher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type PeerConnection = (std::net::TcpStream, [u8; 32]);

const RETRY_INTERVAL: Duration = Duration::from_millis(100);



// TODO: ADD BETTER ERROR REPORTING.
// add a timeout.
// if fails, specify if it failed on connecting to peer, or verifying peer.
Expand Down

0 comments on commit 2fcd76c

Please sign in to comment.