Skip to content

Commit

Permalink
feat: use postcard encoding for all transports that require serializa…
Browse files Browse the repository at this point in the history
…tion (#114)
  • Loading branch information
rklaehn authored Nov 29, 2024
2 parents a5606c2 + 1ab8564 commit badb606
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 63 deletions.
42 changes: 41 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ description = "A streaming rpc system based on quic"
rust-version = "1.76"

[dependencies]
bincode = { version = "1.3.3", optional = true }
bytes = { version = "1", optional = true }
derive_more = { version = "1.0.0-beta.6", features = ["from", "try_into", "display"] }
flume = { version = "0.11", optional = true }
Expand All @@ -28,12 +27,14 @@ serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", default-features = false, features = ["macros", "sync"] }
tokio-serde = { version = "0.9", features = ["bincode"], optional = true }
tokio-util = { version = "0.7", features = ["rt"] }
postcard = { version = "1", features = ["use-std"], optional = true }
tracing = "0.1"
futures = { version = "0.3.30", optional = true }
anyhow = "1.0.73"

# Indirect dependencies, is needed to make the minimal crates versions work
slab = "0.4.9" # iroh-quinn
smallvec = "1.13.2"
time = "0.3.36" # serde

[dev-dependencies]
Expand All @@ -54,10 +55,10 @@ nested_enum_utils = "0.1.0"
tokio-util = { version = "0.7", features = ["rt"] }

[features]
hyper-transport = ["dep:flume", "dep:hyper", "dep:bincode", "dep:bytes", "dep:tokio-serde", "tokio-util/codec"]
quinn-transport = ["dep:flume", "dep:quinn", "dep:bincode", "dep:tokio-serde", "tokio-util/codec"]
hyper-transport = ["dep:flume", "dep:hyper", "dep:postcard", "dep:bytes", "dep:tokio-serde", "tokio-util/codec"]
quinn-transport = ["dep:flume", "dep:quinn", "dep:postcard", "dep:tokio-serde", "tokio-util/codec"]
flume-transport = ["dep:flume"]
iroh-net-transport = ["dep:iroh-net", "dep:flume", "dep:quinn", "dep:bincode", "dep:tokio-serde", "tokio-util/codec"]
iroh-net-transport = ["dep:iroh-net", "dep:flume", "dep:quinn", "dep:postcard", "dep:tokio-serde", "tokio-util/codec"]
macros = []
default = ["flume-transport"]

Expand Down
4 changes: 2 additions & 2 deletions src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> OpenFuture<'a, In, Out> {
}
}

impl<'a, In: RpcMessage, Out: RpcMessage> Future for OpenFuture<'a, In, Out> {
impl<In: RpcMessage, Out: RpcMessage> Future for OpenFuture<'_, In, Out> {
type Output = anyhow::Result<(SendSink<Out>, RecvStream<In>)>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> AcceptFuture<'a, In, Out> {
}
}

impl<'a, In: RpcMessage, Out: RpcMessage> Future for AcceptFuture<'a, In, Out> {
impl<In: RpcMessage, Out: RpcMessage> Future for AcceptFuture<'_, In, Out> {
type Output = anyhow::Result<(SendSink<Out>, RecvStream<In>)>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Expand Down
12 changes: 6 additions & 6 deletions src/transport/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ async fn try_forward_all<In: RpcMessage>(
let mut sent = 0;
while let Some(msg) = try_get_length_prefixed(&buffer[sent..]) {
sent += msg.len() + 4;
let item = bincode::deserialize::<In>(msg).map_err(RecvError::DeserializeError);
let item = postcard::from_bytes::<In>(msg).map_err(RecvError::DeserializeError);
if let Err(_cause) = req_tx.send_async(item).await {
// The receiver is gone, so we can't send any more data.
//
Expand Down Expand Up @@ -434,7 +434,7 @@ impl<Out: RpcMessage> SendSink<Out> {
fn serialize(&self, item: Out) -> Result<Bytes, SendError> {
let mut data = Vec::with_capacity(1024);
data.extend_from_slice(&[0u8; 4]);
bincode::serialize_into(&mut data, &item).map_err(SendError::SerializeError)?;
let mut data = postcard::to_extend(&item, data).map_err(SendError::SerializeError)?;
let len = data.len() - 4;
if len > self.config.max_payload_size {
return Err(SendError::SizeError(len));
Expand Down Expand Up @@ -503,8 +503,8 @@ impl<Out: RpcMessage> Sink<Out> for SendSink<Out> {
/// Send error for hyper channels.
#[derive(Debug)]
pub enum SendError {
/// Error when bincode serializing the message.
SerializeError(bincode::Error),
/// Error when postcard serializing the message.
SerializeError(postcard::Error),
/// The message is too large to be sent.
SizeError(usize),
/// The connection has been closed.
Expand All @@ -522,8 +522,8 @@ impl error::Error for SendError {}
/// Receive error for hyper channels.
#[derive(Debug)]
pub enum RecvError {
/// Error when bincode deserializing the message.
DeserializeError(bincode::Error),
/// Error when postcard deserializing the message.
DeserializeError(postcard::Error),
/// Hyper network error.
NetworkError(hyper::Error),
}
Expand Down
14 changes: 7 additions & 7 deletions src/transport/iroh_net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::{sync::oneshot, task::yield_now};
use tracing::{debug_span, Instrument};

use super::{
util::{FramedBincodeRead, FramedBincodeWrite},
util::{FramedPostcardRead, FramedPostcardWrite},
StreamTypes,
};
use crate::{
Expand Down Expand Up @@ -658,12 +658,12 @@ impl<In: RpcMessage, Out: RpcMessage> Connector for IrohNetConnector<In, Out> {
}
}

/// A sink that wraps a quinn SendStream with length delimiting and bincode
/// A sink that wraps a quinn SendStream with length delimiting and postcard
///
/// If you want to send bytes directly, use [SendSink::into_inner] to get the
/// underlying [quinn::SendStream].
#[pin_project]
pub struct SendSink<Out>(#[pin] FramedBincodeWrite<quinn::SendStream, Out>);
pub struct SendSink<Out>(#[pin] FramedPostcardWrite<quinn::SendStream, Out>);

impl<Out> fmt::Debug for SendSink<Out> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -673,7 +673,7 @@ impl<Out> fmt::Debug for SendSink<Out> {

impl<Out: Serialize> SendSink<Out> {
fn new(inner: quinn::SendStream) -> Self {
let inner = FramedBincodeWrite::new(inner, MAX_FRAME_LENGTH);
let inner = FramedPostcardWrite::new(inner, MAX_FRAME_LENGTH);
Self(inner)
}
}
Expand Down Expand Up @@ -706,12 +706,12 @@ impl<Out: Serialize> Sink<Out> for SendSink<Out> {
}
}

/// A stream that wraps a quinn RecvStream with length delimiting and bincode
/// A stream that wraps a quinn RecvStream with length delimiting and postcard
///
/// If you want to receive bytes directly, use [RecvStream::into_inner] to get
/// the underlying [quinn::RecvStream].
#[pin_project]
pub struct RecvStream<In>(#[pin] FramedBincodeRead<quinn::RecvStream, In>);
pub struct RecvStream<In>(#[pin] FramedPostcardRead<quinn::RecvStream, In>);

impl<In> fmt::Debug for RecvStream<In> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -721,7 +721,7 @@ impl<In> fmt::Debug for RecvStream<In> {

impl<In: DeserializeOwned> RecvStream<In> {
fn new(inner: quinn::RecvStream) -> Self {
let inner = FramedBincodeRead::new(inner, MAX_FRAME_LENGTH);
let inner = FramedPostcardRead::new(inner, MAX_FRAME_LENGTH);
Self(inner)
}
}
Expand Down
6 changes: 1 addition & 5 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ pub mod misc;
#[cfg(feature = "quinn-transport")]
pub mod quinn;

#[cfg(any(
feature = "quinn-transport",
feature = "hyper-transport",
feature = "iroh-net-transport"
))]
#[cfg(any(feature = "quinn-transport", feature = "iroh-net-transport"))]
mod util;

/// Errors that can happen when creating and using a [`Connector`] or [`Listener`].
Expand Down
16 changes: 8 additions & 8 deletions src/transport/quinn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::sync::oneshot;
use tracing::{debug_span, Instrument};

use super::{
util::{FramedBincodeRead, FramedBincodeWrite},
util::{FramedPostcardRead, FramedPostcardWrite},
StreamTypes,
};
use crate::{
Expand Down Expand Up @@ -578,7 +578,7 @@ impl<'a, T> Receiver<'a, T> {
}
}

impl<'a, T> Stream for Receiver<'a, T> {
impl<T> Stream for Receiver<'_, T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -653,12 +653,12 @@ impl<In: RpcMessage, Out: RpcMessage> Connector for QuinnConnector<In, Out> {
}
}

/// A sink that wraps a quinn SendStream with length delimiting and bincode
/// A sink that wraps a quinn SendStream with length delimiting and postcard
///
/// If you want to send bytes directly, use [SendSink::into_inner] to get the
/// underlying [quinn::SendStream].
#[pin_project]
pub struct SendSink<Out>(#[pin] FramedBincodeWrite<quinn::SendStream, Out>);
pub struct SendSink<Out>(#[pin] FramedPostcardWrite<quinn::SendStream, Out>);

impl<Out> fmt::Debug for SendSink<Out> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -668,7 +668,7 @@ impl<Out> fmt::Debug for SendSink<Out> {

impl<Out: Serialize> SendSink<Out> {
fn new(inner: quinn::SendStream) -> Self {
let inner = FramedBincodeWrite::new(inner, MAX_FRAME_LENGTH);
let inner = FramedPostcardWrite::new(inner, MAX_FRAME_LENGTH);
Self(inner)
}
}
Expand Down Expand Up @@ -710,12 +710,12 @@ impl<Out: Serialize> Sink<Out> for SendSink<Out> {
}
}

/// A stream that wraps a quinn RecvStream with length delimiting and bincode
/// A stream that wraps a quinn RecvStream with length delimiting and postcard
///
/// If you want to receive bytes directly, use [RecvStream::into_inner] to get
/// the underlying [quinn::RecvStream].
#[pin_project]
pub struct RecvStream<In>(#[pin] FramedBincodeRead<quinn::RecvStream, In>);
pub struct RecvStream<In>(#[pin] FramedPostcardRead<quinn::RecvStream, In>);

impl<In> fmt::Debug for RecvStream<In> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -725,7 +725,7 @@ impl<In> fmt::Debug for RecvStream<In> {

impl<In: DeserializeOwned> RecvStream<In> {
fn new(inner: quinn::RecvStream) -> Self {
let inner = FramedBincodeRead::new(inner, MAX_FRAME_LENGTH);
let inner = FramedPostcardRead::new(inner, MAX_FRAME_LENGTH);
Self(inner)
}
}
Expand Down
Loading

0 comments on commit badb606

Please sign in to comment.