Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Perf #295

Merged
merged 4 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 55 additions & 6 deletions secio/src/codec/secure_stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bytes::{Bytes, BytesMut};
use bytes::{Buf, Bytes, BytesMut};
use futures::{SinkExt, StreamExt};
use log::{debug, trace};
use tokio::{
Expand All @@ -16,6 +16,45 @@ use std::{

use crate::{crypto::BoxStreamCipher, error::SecioError};

enum RecvBuf {
Vec(Vec<u8>),
Byte(BytesMut),
}

impl RecvBuf {
fn copy_to(&mut self, buf: &mut [u8], size: usize) {
driftluo marked this conversation as resolved.
Show resolved Hide resolved
match self {
RecvBuf::Vec(ref mut b) => {
buf[..size].copy_from_slice(b.drain(..size).as_slice());
}
RecvBuf::Byte(ref mut b) => {
buf[..size].copy_from_slice(&b[..size]);
b.advance(size);
}
}
}

fn len(&self) -> usize {
match self {
RecvBuf::Vec(ref b) => b.len(),
RecvBuf::Byte(ref b) => b.len(),
}
}

fn is_empty(&self) -> bool {
self.len() == 0
}
}

impl AsRef<[u8]> for RecvBuf {
fn as_ref(&self) -> &[u8] {
match self {
RecvBuf::Vec(ref b) => b.as_ref(),
RecvBuf::Byte(ref b) => b.as_ref(),
}
}
}

/// Encrypted stream
pub struct SecureStream<T> {
socket: Framed<T, LengthDelimitedCodec>,
Expand All @@ -31,7 +70,7 @@ pub struct SecureStream<T> {
/// frame from the underlying Framed<>, the frame will be filled
/// into this buffer so that multiple following 'read' will eventually
/// get the message correctly
recv_buf: Vec<u8>,
recv_buf: RecvBuf,
}

impl<T> SecureStream<T>
Expand All @@ -45,19 +84,29 @@ where
encode_cipher: BoxStreamCipher,
nonce: Vec<u8>,
) -> Self {
let recv_buf = if decode_cipher.is_in_place() {
RecvBuf::Byte(BytesMut::new())
} else {
RecvBuf::Vec(Vec::default())
};
SecureStream {
socket,
decode_cipher,
encode_cipher,
nonce,
recv_buf: Vec::default(),
recv_buf,
}
}

/// Decoding data
#[inline]
fn decode_buffer(&mut self, frame: BytesMut) -> Result<Vec<u8>, SecioError> {
Ok(self.decode_cipher.decrypt(&frame)?)
fn decode_buffer(&mut self, mut frame: BytesMut) -> Result<RecvBuf, SecioError> {
if self.decode_cipher.is_in_place() {
self.decode_cipher.decrypt_in_place(&mut frame)?;
Ok(RecvBuf::Byte(frame))
} else {
Ok(RecvBuf::Vec(self.decode_cipher.decrypt(&frame)?))
}
}

pub(crate) async fn verify_nonce(&mut self) -> Result<(), SecioError> {
Expand Down Expand Up @@ -93,7 +142,7 @@ where
let n = ::std::cmp::min(buf.len(), self.recv_buf.len());

// Copy data to the output buffer
buf[..n].copy_from_slice(self.recv_buf.drain(..n).as_slice());
self.recv_buf.copy_to(buf, n);

n
}
Expand Down
11 changes: 11 additions & 0 deletions secio/src/crypto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ pub trait StreamCipher {
fn encrypt(&mut self, input: &[u8]) -> Result<Vec<u8>, SecioError>;
/// Feeds data from input through the cipher, return decrypted bytes.
fn decrypt(&mut self, input: &[u8]) -> Result<Vec<u8>, SecioError>;
/// Whether support in place decrypt
#[inline]
fn is_in_place(&self) -> bool {
false
}
/// Feeds data from input through the cipher, in place decrypted.
fn decrypt_in_place(&mut self, _input: &mut bytes::BytesMut) -> Result<(), SecioError> {
Err(SecioError::InvalidProposition(
"don't support in place decrypted",
))
}
}

/// Crypto mode, encrypt or decrypt
Expand Down
41 changes: 31 additions & 10 deletions secio/src/crypto/ring_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,41 @@ impl RingAeadCipher {
.len()
.checked_sub(self.cipher_type.tag_size())
.ok_or(SecioError::FrameTooShort)?;
let mut output = Vec::with_capacity(output_len);
let mut buf = Vec::with_capacity(
self.cipher_type
.tag_size()
.checked_add(input.len())
.ok_or(SecioError::InvalidMessage)?,
);
let mut buf = Vec::with_capacity(input.len());

unsafe {
output.set_len(output_len);
buf.set_len(input.len());
}
buf.copy_from_slice(input);

if let RingAeadCryptoVariant::Open(ref mut key) = self.cipher {
match key.open_in_place(Aad::empty(), &mut buf) {
Ok(out_buf) => output.copy_from_slice(out_buf),
Ok(_) => (),
driftluo marked this conversation as resolved.
Show resolved Hide resolved
Err(e) => return Err(e.into()),
}
} else {
unreachable!("encrypt is called on a non-open cipher")
}
Ok(output)
buf.truncate(output_len);
Ok(buf)
}

pub fn decrypt_in_place(&mut self, input: &mut bytes::BytesMut) -> Result<(), SecioError> {
let output_len = input
.len()
.checked_sub(self.cipher_type.tag_size())
.ok_or(SecioError::FrameTooShort)?;

if let RingAeadCryptoVariant::Open(ref mut key) = self.cipher {
match key.open_in_place(Aad::empty(), input) {
Ok(_) => (),
Err(e) => return Err(e.into()),
}
driftluo marked this conversation as resolved.
Show resolved Hide resolved
} else {
unreachable!("encrypt is called on a non-open cipher")
}
input.truncate(output_len);
Ok(())
}
}

Expand All @@ -130,6 +142,15 @@ impl StreamCipher for RingAeadCipher {
fn decrypt(&mut self, input: &[u8]) -> Result<Vec<u8>, SecioError> {
self.decrypt(input)
}

#[inline]
fn is_in_place(&self) -> bool {
true
}

fn decrypt_in_place(&mut self, input: &mut bytes::BytesMut) -> Result<(), SecioError> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add a new fn to trait since the invoker has the mut reference already, can we change the original fn

fn decrypt(&mut self, input: &[u8]) -> Result<Vec<u8>, SecioError>

to

fn decrypt(&mut self, input: &mut [u8]) -> Result<(), SecioError>

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no new fn is added, OpenSSL will copy one more time, because OpenSSL bind does not have an in-place interface

Copy link
Collaborator Author

@driftluo driftluo Jan 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the same time, because &mut [u8] does not have shortened semantics(In other words, it doesn't have ownership, so it cannot be deleted/shortened), the delete tag operation on in-place cannot be performed on this type. Forcibly shortening its length with a pointer will cause a dangling pointer, so the type must be marked with shortening semantics type

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about change to BytesMut?

fn decrypt(&mut self, input: &mut BytesMut) -> Result<(), SecioError>

self.decrypt_in_place(input)
}
}

#[cfg(test)]
Expand Down
9 changes: 5 additions & 4 deletions yamux/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The substream, the main interface is AsyncRead/AsyncWrite

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use futures::{
channel::mpsc::{Receiver, Sender, UnboundedSender},
stream::FusedStream,
Expand Down Expand Up @@ -32,7 +32,7 @@ pub struct StreamHandle {
max_recv_window: u32,
recv_window: u32,
send_window: u32,
read_buf: BytesMut,
read_buf: Bytes,

// Send stream event to parent session
event_sender: Sender<StreamEvent>,
Expand Down Expand Up @@ -64,7 +64,7 @@ impl StreamHandle {
max_recv_window: recv_window_size,
recv_window: recv_window_size,
send_window: send_window_size,
read_buf: BytesMut::default(),
read_buf: Bytes::default(),
event_sender,
unbound_event_sender,
frame_receiver,
Expand Down Expand Up @@ -275,7 +275,8 @@ impl StreamHandle {

let (_, body) = frame.into_parts();
if let Some(data) = body {
self.read_buf.extend_from_slice(&data);
// only when buf is empty, poll read can read from remote
self.read_buf = data;
}
self.recv_window -= length;
Ok(())
Expand Down