Skip to content

Commit

Permalink
implement tcp, quic
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Jan 31, 2024
1 parent 329fc52 commit 406b5cd
Show file tree
Hide file tree
Showing 12 changed files with 1,095 additions and 14 deletions.
816 changes: 816 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
resolver = "2"
members = ["broker-proto", "broker", "broker-client"]

[workspace.dependencies]
tokio = { version = "1.35.1", features = ["full"] }
4 changes: 3 additions & 1 deletion broker-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ capnpc = "0.19.0"

[dependencies]
capnp = "0.19.1"
thiserror = "1.0.56"
thiserror = "1.0.56"
quinn = "0.10.2"
tokio.workspace = true
23 changes: 22 additions & 1 deletion broker-proto/src/connection/fallible/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
/// This module defines fallible networks and their implementations.
//! This module defines connections and their implementations.

use std::sync::Arc;

use crate::{error::Result, message::Message};

pub mod quic;
pub mod tcp;

/// Assertion that we are at _least_ running on a 32-bit system
/// TODO: find out if there is a better way than the `u32` cast
const _: [(); 0 - (!(usize::BITS >= u32::BITS)) as usize] = [];

trait Connection {
/// Receive a single message from the connection.
///
/// # Errors
/// Errors if we either fail to receive the message. This usually means a connection problem.
async fn recv_message(&self) -> Result<Message>;

/// Send a single message over the connection.
///
/// # Errors
/// Errors if we fail to deliver the message. This usually means a connection problem.
async fn send_message(&self, message: Arc<Message>) -> Result<()>;
}
117 changes: 117 additions & 0 deletions broker-proto/src/connection/fallible/quic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//! This file defines and implements a thin wrapper around a QUIC
//! connection that implements our message framing and connection
//! logic.

use crate::{
bail,
error::{Error, Result},
message::Message,
MAX_MESSAGE_SIZE,
};
use core::hash::Hash;
use std::sync::Arc;

use super::Connection;

/// `Fallible` is a thin wrapper around `quinn::Connection` that implements
/// `Connection`.
#[derive(Clone)]
pub struct Fallible(pub quinn::Connection);

/// `PartialEq` for a `Fallible` connection is determined by the `stable_id` since it
/// will not change for the duration of the connection.
impl PartialEq for Fallible {
fn eq(&self, other: &Self) -> bool {
self.0.stable_id() == other.0.stable_id()
}
}

/// Assertion for `Fallible` that `PartialEq` == `Eq`
impl Eq for Fallible {
fn assert_receiver_is_total_eq(&self) {}
}

/// `Hash` for a `Fallible` connection is determined by the `stable_id` since it
/// will not change for the duration of the connection. We just want to hash that.
impl Hash for Fallible {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.0.stable_id().hash(state);
}

/// This just calls `hash` on each item in the slice.
fn hash_slice<H: std::hash::Hasher>(data: &[Self], state: &mut H)
where
Self: Sized,
{
data.iter().for_each(|item| item.hash(state));
}
}

impl Connection for Fallible {
/// Receives a single message from the QUIC connection. Since we use
/// virtual streams as a message framing method, this function first accepts a stream
/// and then reads and deserializes a single message from it.
///
/// # Errors
/// Errors if we either failed to accept the stream or receive the message over that stream.
/// This usually means a connection problem.
async fn recv_message(&self) -> Result<Message> {
// Accept the incoming unidirectional stream
let mut stream = bail!(
self.0.accept_uni().await,
ConnectionError,
"failed to accept unidirectional stream"
);

// Read the full message, until the sender closes the stream
let message_bytes = bail!(
stream.read_to_end(MAX_MESSAGE_SIZE as usize).await,
ConnectionError,
"failed to read from stream"
);

// Deserialize and return the message
Ok(bail!(
Message::deserialize(&message_bytes),
DeserializeError,
"failed to deserialize message"
))
}

/// Sends a single message to the QUIC connection. This function first opens a
/// stream and then serializes and sends a single message to it.
///
/// # Errors
/// Errors if we either failed to open the stream or send the message over that stream.
/// This usually means a connection problem.
async fn send_message(&self, message: Arc<Message>) -> Result<()> {
// Open the outgoing unidirectional stream
let mut stream = bail!(
self.0.open_uni().await,
ConnectionError,
"failed to open unidirectional stream"
);

// Serialize the message
let message_bytes = bail!(
message.serialize(),
SerializeError,
"failed to serialize message"
);

// Write the full message to the stream
bail!(
stream.write_all(&message_bytes).await,
ConnectionError,
"failed to write to stream"
);

// Finish the stream, denoting to the peer that the
// message has been fully written
Ok(bail!(
stream.finish().await,
ConnectionError,
"failed to finish stream"
))
}
}
106 changes: 106 additions & 0 deletions broker-proto/src/connection/fallible/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//! This file defines and implements a thin wrapper around a TCP
//! connection that implements our message framing and connection
//! logic.

use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::tcp::{OwnedReadHalf, OwnedWriteHalf},
sync::Mutex,
};

use crate::{
bail,
error::{Error, Result},
message::Message,
MAX_MESSAGE_SIZE,
};
use std::sync::Arc;

use super::Connection;

/// `Fallible` is a thin wrapper around `OwnedReadHalf` and `OwnedWriteHalf` that implements
/// `Connection`.
#[derive(Clone)]
pub struct Fallible {
pub receiver: Arc<Mutex<OwnedReadHalf>>,
pub sender: Arc<Mutex<OwnedWriteHalf>>,
}

impl Connection for Fallible {
/// Receives a single message from the TCP connection. It reads the size
/// of the message from the stream, reads the message, and then
/// deserializes and returns it.
///
/// # Errors
/// Errors if we either failed to receive or deserialize the message.
/// This usually means a connection problem.
async fn recv_message(&self) -> Result<Message> {
// Lock the stream so we don't receive message/message sizes interleaved
let mut receiver_guard = self.receiver.lock().await;

// Read the message size from the stream
let message_size = bail!(
receiver_guard.read_u32().await,
ConnectionError,
"failed to read message size"
);
// Make sure the message isn't too big
if message_size > MAX_MESSAGE_SIZE {
return Err(Error::ConnectionError(
"expected to receive message that was too big".to_string(),
));
}

// Create buffer of the proper size
let mut buffer = vec![0; message_size as usize];
// Read the message from the stream
bail!(
receiver_guard.read_exact(&mut buffer).await,
ConnectionError,
"failed to receive message from connection"
);

// Deserialize and return the message
Ok(bail!(
Message::deserialize(&buffer),
DeserializeError,
"failed to deserialize message"
))
}

/// Sends a single message to the QUIC connection. This function first opens a
/// stream and then serializes and sends a single message to it.
///
/// # Errors
/// Errors if we either failed to open the stream or send the message over that stream.
/// This usually means a connection problem.
async fn send_message(&self, message: Arc<Message>) -> Result<()> {
// Lock the stream so we don't send message/message sizes interleaved
let mut sender_guard = self.sender.lock().await;

// Serialize the message
let serialized_message = bail!(
message.serialize(),
SerializeError,
"failed to serialize message"
);

// Write the message size to the stream
bail!(
sender_guard
.write_u32(serialized_message.len() as u32)
.await,
ConnectionError,
"failed to send message size"
);

// Write the message to the stream
bail!(
sender_guard.write_all(&serialized_message).await,
ConnectionError,
"failed to send message"
);

Ok(())
}
}
4 changes: 1 addition & 3 deletions broker-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
/// This module defines network connections, their types, and their implementations.

pub mod fallible;
pub mod infallible;
pub mod sticky;
File renamed without changes.
2 changes: 1 addition & 1 deletion broker-proto/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/// This file defines common errors used by CDN clients and servers.
//! This file defines common errors used by CDN clients and servers.

use core::result::Result as StdResult;
use thiserror::Error;
Expand Down
23 changes: 21 additions & 2 deletions broker-proto/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,30 @@
/// This crate defines the common code structures used by both the
/// broker client and server.
//! This crate defines the common code structures and constants used by both the
//! broker client and server.

pub mod connection;
pub mod error;
pub mod message;
pub mod wal;

/// Common constants used in both the client and server
///
/// The maximum message size to be received over a connection.
/// After this, it will be automatically closed by the receiver.
pub const MAX_MESSAGE_SIZE: u32 = 1024 * 1024 * 1024;

/// The maximum amount of concurrent QUIC streams (messages) that can be opened.
/// Having a value that is too high can degrade performance.
pub const QUIC_MAX_CONCURRENT_STREAMS: u64 = 10;

/// Specifies the number of subsequent connection failures before we try
/// to connect to the next server address, if applicable (we might be
/// using a single server address).
pub const SUBSEQUENT_CONNECTION_FAILURES_BEFORE_NEXT_ADDRESS: usize = 3;

/// Specifies the number of subsequent operation (message send, authentication, etc)
/// failures before we try to reconnect to the current (or next) server.
pub const SUBSEQUENT_OPERATION_FAILURES_BEFORE_NEXT_RECONNECT: usize = 3;

/// Include the built capnp-rust bindings
#[allow(clippy::all, clippy::pedantic, clippy::restriction, clippy::nursery)]
pub mod messages_capnp {
Expand Down
6 changes: 3 additions & 3 deletions broker-proto/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/// The message serialization and deserialization layer. Used by all
/// messages sent to/from a broker or user.
/// TODO: clean up. Maybe use Cap'n'Proto messages directly.
//! The message serialization and deserialization layer. Used by all
//! messages sent to/from a broker or user.
//! TODO: clean up. Maybe use Cap'n'Proto messages directly.

use capnp::{
message::ReaderOptions,
Expand Down
6 changes: 3 additions & 3 deletions broker-proto/src/wal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/// This file defines a write-ahead log that we use for broker <-> broker replication.
/// It does not write to disk, but contains the primitive that allows us to
/// catch up from a snapshot or from a list of logs.
//! This file defines a write-ahead log that we use for broker <-> broker replication.
//! It does not write to disk, but contains the primitive that allows us to
//! catch up from a snapshot or from a list of logs.

use std::{
collections::{HashSet, VecDeque},
Expand Down

0 comments on commit 406b5cd

Please sign in to comment.