Skip to content

Commit

Permalink
feat(codec): revert to tokio-util traits
Browse files Browse the repository at this point in the history
Pull request hyperium#208 introduced `tonic::codec::{DecodeBuf, Decoder, EncodeBuf, Encoder}`
in preparation for experimenting with a different decoding strategy, but as it
currently stands the in-tree functionality is equivalent to the original
implementation using `tokio_util::codec::{Decoder, Encoder}` and
`bytes::BytesMut`. Given that a significant amount of time has passed with no
experimentation having happened (as far as I can tell), this commit reverts
those changes. Additionally, it also restores efficient use of the underlying
buffers, an issue that was brought up later in its respective comment thread.
  • Loading branch information
themaxdavitt committed Sep 23, 2021
1 parent ddab65e commit 3ddb479
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 199 deletions.
11 changes: 5 additions & 6 deletions tonic/benches/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tonic::{codec::DecodeBuf, codec::Decoder, Status, Streaming};
use tonic::{codec::Decoder, Status, Streaming};

macro_rules! bench {
($name:ident, $message_size:expr, $chunk_size:expr, $message_count:expr) => {
Expand Down Expand Up @@ -97,13 +97,12 @@ impl MockDecoder {
}

impl Decoder for MockDecoder {
type Item = Vec<u8>;
type Item = Bytes;
type Error = Status;

fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
let out = Vec::from(buf.chunk());
buf.advance(self.message_size);
Ok(Some(out))
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let item = buf.split_to(self.message_size).freeze();
Ok(Some(item))
}
}

Expand Down
121 changes: 0 additions & 121 deletions tonic/src/codec/buffer.rs

This file was deleted.

11 changes: 3 additions & 8 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(feature = "compression")]
use super::compression::{decompress, CompressionEncoding};
use super::{DecodeBuf, Decoder, HEADER_SIZE};
use super::{Decoder, HEADER_SIZE};
use crate::{body::BoxBody, metadata::MetadataMap, Code, Status};
use bytes::{Buf, BufMut, BytesMut};
use futures_core::Stream;
Expand Down Expand Up @@ -266,18 +266,13 @@ impl<T> Streaming<T> {
};
return Err(Status::new(Code::Internal, message));
}
let decompressed_len = self.decompress_buf.len();
self.decoder.decode(&mut DecodeBuf::new(
&mut self.decompress_buf,
decompressed_len,
))
self.decoder.decode(&mut self.decompress_buf)
}

#[cfg(not(feature = "compression"))]
unreachable!("should not take this branch if compression is disabled")
} else {
self.decoder
.decode(&mut DecodeBuf::new(&mut self.buf, *len))
self.decoder.decode(&mut self.buf)
};

return match decoding_result {
Expand Down
28 changes: 14 additions & 14 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(feature = "compression")]
use super::compression::{compress, CompressionEncoding, SingleMessageCompressionOverride};
use super::{EncodeBuf, Encoder, HEADER_SIZE};
use super::{Encoder, HEADER_SIZE};
use crate::{Code, Status};
use bytes::{BufMut, Bytes, BytesMut};
use futures_core::{Stream, TryStream};
Expand All @@ -15,16 +15,16 @@ use std::{

pub(super) const BUFFER_SIZE: usize = 8 * 1024;

pub(crate) fn encode_server<T, U>(
pub(crate) fn encode_server<T, I, U>(
encoder: T,
source: U,
#[cfg(feature = "compression")] compression_encoding: Option<CompressionEncoding>,
#[cfg(feature = "compression")] compression_override: SingleMessageCompressionOverride,
) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
where
T: Encoder<Error = Status> + Send + Sync + 'static,
T::Item: Send + Sync,
U: Stream<Item = Result<T::Item, Status>> + Send + Sync + 'static,
T: Encoder<I, Error = Status> + Send + Sync + 'static,
I: Send + Sync + 'static,
U: Stream<Item = Result<I, Status>> + Send + Sync + 'static,
{
let stream = encode(
encoder,
Expand All @@ -39,15 +39,15 @@ where
EncodeBody::new_server(stream)
}

pub(crate) fn encode_client<T, U>(
pub(crate) fn encode_client<T, I, U>(
encoder: T,
source: U,
#[cfg(feature = "compression")] compression_encoding: Option<CompressionEncoding>,
) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
where
T: Encoder<Error = Status> + Send + Sync + 'static,
T::Item: Send + Sync,
U: Stream<Item = T::Item> + Send + Sync + 'static,
T: Encoder<I, Error = Status> + Send + Sync + 'static,
I: Send + Sync + 'static,
U: Stream<Item = I> + Send + Sync + 'static,
{
let stream = encode(
encoder,
Expand All @@ -61,15 +61,15 @@ where
EncodeBody::new_client(stream)
}

fn encode<T, U>(
fn encode<T, I, U>(
mut encoder: T,
source: U,
#[cfg(feature = "compression")] compression_encoding: Option<CompressionEncoding>,
#[cfg(feature = "compression")] compression_override: SingleMessageCompressionOverride,
) -> impl TryStream<Ok = Bytes, Error = Status>
where
T: Encoder<Error = Status>,
U: Stream<Item = Result<T::Item, Status>>,
T: Encoder<I, Error = Status>,
U: Stream<Item = Result<I, Status>>,
{
async_stream::stream! {
let mut buf = BytesMut::with_capacity(BUFFER_SIZE);
Expand Down Expand Up @@ -101,7 +101,7 @@ where
{
uncompression_buf.clear();

encoder.encode(item, &mut EncodeBuf::new(&mut uncompression_buf))
encoder.encode(item, &mut uncompression_buf)
.map_err(|err| Status::internal(format!("Error encoding: {}", err)))?;

let uncompressed_len = uncompression_buf.len();
Expand All @@ -117,7 +117,7 @@ where
#[cfg(not(feature = "compression"))]
unreachable!("compression disabled, should not take this branch");
} else {
encoder.encode(item, &mut EncodeBuf::new(&mut buf))
encoder.encode(item, &mut buf)
.map_err(|err| Status::internal(format!("Error encoding: {}", err)))?;
}

Expand Down
35 changes: 2 additions & 33 deletions tonic/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//! This module contains the generic `Codec`, `Encoder` and `Decoder` traits
//! and a protobuf codec based on prost.
mod buffer;
#[cfg(feature = "compression")]
pub(crate) mod compression;
mod decode;
Expand All @@ -12,11 +11,9 @@ mod encode;
mod prost;

use crate::Status;
use std::io;

pub(crate) use self::encode::{encode_client, encode_server};

pub use self::buffer::{DecodeBuf, EncodeBuf};
#[cfg(feature = "compression")]
#[cfg_attr(docsrs, doc(cfg(feature = "compression")))]
pub use self::compression::{CompressionEncoding, EnabledCompressionEncodings};
Expand All @@ -40,7 +37,7 @@ pub trait Codec: Default {
type Decode: Send + 'static;

/// The encoder that can encode a message.
type Encoder: Encoder<Item = Self::Encode, Error = Status> + Send + Sync + 'static;
type Encoder: Encoder<Self::Encode, Error = Status> + Send + Sync + 'static;
/// The encoder that can decode a message.
type Decoder: Decoder<Item = Self::Decode, Error = Status> + Send + Sync + 'static;

Expand All @@ -50,32 +47,4 @@ pub trait Codec: Default {
fn decoder(&mut self) -> Self::Decoder;
}

/// Encodes gRPC message types
pub trait Encoder {
/// The type that is encoded.
type Item;

/// The type of encoding errors.
///
/// The type of unrecoverable frame encoding errors.
type Error: From<io::Error>;

/// Encodes a message into the provided buffer.
fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error>;
}

/// Decodes gRPC message types
pub trait Decoder {
/// The type that is decoded.
type Item;

/// The type of unrecoverable frame decoding errors.
type Error: From<io::Error>;

/// Decode a message from the buffer.
///
/// The buffer will contain exactly the bytes of a full message. There
/// is no need to get the length from the bytes, gRPC framing is handled
/// for you.
fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error>;
}
pub use tokio_util::codec::{Decoder, Encoder};
Loading

0 comments on commit 3ddb479

Please sign in to comment.