Skip to content

Commit

Permalink
Merge pull request #295 from nervosnetwork/perf
Browse files Browse the repository at this point in the history
perf: Perf
  • Loading branch information
driftluo authored Jan 21, 2021
2 parents b34e386 + 8ad1df3 commit b4edade
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 23 deletions.
61 changes: 55 additions & 6 deletions secio/src/codec/secure_stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bytes::{Bytes, BytesMut};
use bytes::{Buf, Bytes, BytesMut};
use futures::{SinkExt, StreamExt};
use log::{debug, trace};
use tokio::{
Expand All @@ -16,6 +16,45 @@ use std::{

use crate::{crypto::BoxStreamCipher, error::SecioError};

enum RecvBuf {
Vec(Vec<u8>),
Byte(BytesMut),
}

impl RecvBuf {
fn drain_to(&mut self, buf: &mut [u8], size: usize) {
match self {
RecvBuf::Vec(ref mut b) => {
buf[..size].copy_from_slice(b.drain(..size).as_slice());
}
RecvBuf::Byte(ref mut b) => {
buf[..size].copy_from_slice(&b[..size]);
b.advance(size);
}
}
}

fn len(&self) -> usize {
match self {
RecvBuf::Vec(ref b) => b.len(),
RecvBuf::Byte(ref b) => b.len(),
}
}

fn is_empty(&self) -> bool {
self.len() == 0
}
}

impl AsRef<[u8]> for RecvBuf {
fn as_ref(&self) -> &[u8] {
match self {
RecvBuf::Vec(ref b) => b.as_ref(),
RecvBuf::Byte(ref b) => b.as_ref(),
}
}
}

/// Encrypted stream
pub struct SecureStream<T> {
socket: Framed<T, LengthDelimitedCodec>,
Expand All @@ -31,7 +70,7 @@ pub struct SecureStream<T> {
/// frame from the underlying Framed<>, the frame will be filled
/// into this buffer so that multiple following 'read' will eventually
/// get the message correctly
recv_buf: Vec<u8>,
recv_buf: RecvBuf,
}

impl<T> SecureStream<T>
Expand All @@ -45,19 +84,29 @@ where
encode_cipher: BoxStreamCipher,
nonce: Vec<u8>,
) -> Self {
let recv_buf = if decode_cipher.is_in_place() {
RecvBuf::Byte(BytesMut::new())
} else {
RecvBuf::Vec(Vec::default())
};
SecureStream {
socket,
decode_cipher,
encode_cipher,
nonce,
recv_buf: Vec::default(),
recv_buf,
}
}

/// Decoding data
#[inline]
fn decode_buffer(&mut self, frame: BytesMut) -> Result<Vec<u8>, SecioError> {
Ok(self.decode_cipher.decrypt(&frame)?)
fn decode_buffer(&mut self, mut frame: BytesMut) -> Result<RecvBuf, SecioError> {
if self.decode_cipher.is_in_place() {
self.decode_cipher.decrypt_in_place(&mut frame)?;
Ok(RecvBuf::Byte(frame))
} else {
Ok(RecvBuf::Vec(self.decode_cipher.decrypt(&frame)?))
}
}

pub(crate) async fn verify_nonce(&mut self) -> Result<(), SecioError> {
Expand Down Expand Up @@ -93,7 +142,7 @@ where
let n = ::std::cmp::min(buf.len(), self.recv_buf.len());

// Copy data to the output buffer
buf[..n].copy_from_slice(self.recv_buf.drain(..n).as_slice());
self.recv_buf.drain_to(buf, n);

n
}
Expand Down
12 changes: 12 additions & 0 deletions secio/src/crypto/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::error::SecioError;
use bytes::BytesMut;

/// Define cipher
pub mod cipher;
Expand All @@ -21,6 +22,17 @@ pub trait StreamCipher {
fn encrypt(&mut self, input: &[u8]) -> Result<Vec<u8>, SecioError>;
/// Feeds data from input through the cipher, return decrypted bytes.
fn decrypt(&mut self, input: &[u8]) -> Result<Vec<u8>, SecioError>;
/// Whether support in place decrypt
#[inline]
fn is_in_place(&self) -> bool {
false
}
/// Feeds data from input through the cipher, in place decrypted.
fn decrypt_in_place(&mut self, _input: &mut BytesMut) -> Result<(), SecioError> {
Err(SecioError::InvalidProposition(
"don't support in place decrypted",
))
}
}

/// Crypto mode, encrypt or decrypt
Expand Down
41 changes: 28 additions & 13 deletions secio/src/crypto/ring_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,35 @@ impl RingAeadCipher {
.len()
.checked_sub(self.cipher_type.tag_size())
.ok_or(SecioError::FrameTooShort)?;
let mut output = Vec::with_capacity(output_len);
let mut buf = Vec::with_capacity(
self.cipher_type
.tag_size()
.checked_add(input.len())
.ok_or(SecioError::InvalidMessage)?,
);
let mut buf = Vec::with_capacity(input.len());

unsafe {
output.set_len(output_len);
buf.set_len(input.len());
}
buf.copy_from_slice(input);

if let RingAeadCryptoVariant::Open(ref mut key) = self.cipher {
match key.open_in_place(Aad::empty(), &mut buf) {
Ok(out_buf) => output.copy_from_slice(out_buf),
Err(e) => return Err(e.into()),
}
key.open_in_place(Aad::empty(), &mut buf)?;
} else {
unreachable!("encrypt is called on a non-open cipher")
}
Ok(output)
buf.truncate(output_len);
Ok(buf)
}

pub fn decrypt_in_place(&mut self, input: &mut BytesMut) -> Result<(), SecioError> {
let output_len = input
.len()
.checked_sub(self.cipher_type.tag_size())
.ok_or(SecioError::FrameTooShort)?;

if let RingAeadCryptoVariant::Open(ref mut key) = self.cipher {
key.open_in_place(Aad::empty(), input)?;
} else {
unreachable!("encrypt is called on a non-open cipher")
}
input.truncate(output_len);
Ok(())
}
}

Expand All @@ -130,6 +136,15 @@ impl StreamCipher for RingAeadCipher {
fn decrypt(&mut self, input: &[u8]) -> Result<Vec<u8>, SecioError> {
self.decrypt(input)
}

#[inline]
fn is_in_place(&self) -> bool {
true
}

fn decrypt_in_place(&mut self, input: &mut BytesMut) -> Result<(), SecioError> {
self.decrypt_in_place(input)
}
}

#[cfg(test)]
Expand Down
9 changes: 5 additions & 4 deletions yamux/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The substream, the main interface is AsyncRead/AsyncWrite
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use futures::{
channel::mpsc::{Receiver, Sender, UnboundedSender},
stream::FusedStream,
Expand Down Expand Up @@ -32,7 +32,7 @@ pub struct StreamHandle {
max_recv_window: u32,
recv_window: u32,
send_window: u32,
read_buf: BytesMut,
read_buf: Bytes,

// Send stream event to parent session
event_sender: Sender<StreamEvent>,
Expand Down Expand Up @@ -64,7 +64,7 @@ impl StreamHandle {
max_recv_window: recv_window_size,
recv_window: recv_window_size,
send_window: send_window_size,
read_buf: BytesMut::default(),
read_buf: Bytes::default(),
event_sender,
unbound_event_sender,
frame_receiver,
Expand Down Expand Up @@ -275,7 +275,8 @@ impl StreamHandle {

let (_, body) = frame.into_parts();
if let Some(data) = body {
self.read_buf.extend_from_slice(&data);
// only when buf is empty, poll read can read from remote
self.read_buf = data;
}
self.recv_window -= length;
Ok(())
Expand Down

0 comments on commit b4edade

Please sign in to comment.