From 195df85172a22fe710e6ce082dbe82db5f6c8d19 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Mon, 3 Oct 2022 14:00:50 +0200 Subject: [PATCH] fix(dht/encryption): greatly reduce heap allocations for encrypted messaging (#4753) Description --- - Encrypt, decrypt and message padding mutate a single buffer for encrypted messages Motivation and Context --- Encrypted message handling should be as efficient as possible. The previous implementation performed allocations of the full padded message size twice for encryption and twice for decryption. Increasing memory usage, and negating the performance benefits of using an encryption keystream. This PR allocates a single buffer for the message to be de/encrypted and de/encrypts the contents in-place using the BytesMut type from the `bytes` crate. How Has This Been Tested? --- This change is backwards compatible, tested on current esme network and updated existing tests as required. Discovery: OK Memorynet: OK PingPong: OK InteractiveTransactions: OK SafTransactions: OK --- Cargo.lock | 1 - comms/core/src/lib.rs | 2 +- comms/core/src/message/mod.rs | 13 + comms/core/src/protocol/rpc/body.rs | 2 +- comms/dht/Cargo.toml | 1 - comms/dht/src/crypt.rs | 299 ++++++++++-------- comms/dht/src/dedup/mod.rs | 8 +- comms/dht/src/dht.rs | 11 +- comms/dht/src/envelope.rs | 5 +- comms/dht/src/inbound/decryption.rs | 51 ++- comms/dht/src/inbound/deserialize.rs | 4 +- comms/dht/src/inbound/dht_handler/task.rs | 2 +- comms/dht/src/inbound/forward.rs | 20 +- comms/dht/src/outbound/broadcast.rs | 24 +- comms/dht/src/outbound/message.rs | 9 +- comms/dht/src/outbound/mock.rs | 37 ++- comms/dht/src/outbound/requester.rs | 20 +- comms/dht/src/outbound/serialize.rs | 2 +- .../dht/src/store_forward/saf_handler/task.rs | 51 ++- comms/dht/src/store_forward/store.rs | 8 +- comms/dht/src/test_utils/makers.rs | 58 +++- comms/dht/src/test_utils/mod.rs | 2 + comms/dht/tests/dht.rs | 25 +- 23 files changed, 383 insertions(+), 272 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 577db90300..9cc0708252 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4805,7 +4805,6 @@ version = "0.38.4" dependencies = [ "anyhow", "bitflags 1.3.2", - "bytes 0.5.6", "chacha20 0.7.3", "chacha20poly1305 0.9.1", "chrono", diff --git a/comms/core/src/lib.rs b/comms/core/src/lib.rs index d72b52585a..570794feb7 100644 --- a/comms/core/src/lib.rs +++ b/comms/core/src/lib.rs @@ -66,6 +66,6 @@ pub mod multiaddr { } pub use async_trait::async_trait; -pub use bytes::{Bytes, BytesMut}; +pub use bytes::{Buf, BufMut, Bytes, BytesMut}; #[cfg(feature = "rpc")] pub use tower::make::MakeService; diff --git a/comms/core/src/message/mod.rs b/comms/core/src/message/mod.rs index d5a2718797..99172105fc 100644 --- a/comms/core/src/message/mod.rs +++ b/comms/core/src/message/mod.rs @@ -26,6 +26,8 @@ #[macro_use] mod envelope; + +use bytes::BytesMut; pub use envelope::EnvelopeBody; mod error; @@ -52,5 +54,16 @@ pub trait MessageExt: prost::Message { ); buf } + + /// Encodes a message into a BytesMut, allocating the buffer on the heap as necessary. + fn encode_into_bytes_mut(&self) -> BytesMut + where Self: Sized { + let mut buf = BytesMut::with_capacity(self.encoded_len()); + self.encode(&mut buf).expect( + "prost::Message::encode documentation says it is infallible unless the buffer has insufficient capacity. \ + This buffer's capacity was set with encoded_len", + ); + buf + } } impl MessageExt for T {} diff --git a/comms/core/src/protocol/rpc/body.rs b/comms/core/src/protocol/rpc/body.rs index 0790712e0e..a93ce23e8d 100644 --- a/comms/core/src/protocol/rpc/body.rs +++ b/comms/core/src/protocol/rpc/body.rs @@ -177,7 +177,7 @@ impl BodyBytes { } pub fn into_vec(self) -> Vec { - self.0.map(|bytes| bytes.to_vec()).unwrap_or_else(Vec::new) + self.0.map(|bytes| bytes.into()).unwrap_or_else(Vec::new) } pub fn into_bytes(self) -> Option { diff --git a/comms/dht/Cargo.toml b/comms/dht/Cargo.toml index 25d0590575..1fe961ee8e 100644 --- a/comms/dht/Cargo.toml +++ b/comms/dht/Cargo.toml @@ -21,7 +21,6 @@ tari_common_sqlite = { path = "../../common_sqlite" } anyhow = "1.0.53" bitflags = "1.2.0" -bytes = "0.5" chacha20 = "0.7.1" chacha20poly1305 = "0.9.1" chrono = { version = "0.4.19", default-features = false } diff --git a/comms/dht/src/crypt.rs b/comms/dht/src/crypt.rs index 518cace315..e37012db45 100644 --- a/comms/dht/src/crypt.rs +++ b/comms/dht/src/crypt.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::mem::size_of; +use std::{iter, mem::size_of}; use chacha20::{ cipher::{NewCipher, StreamCipher}, @@ -34,8 +34,13 @@ use chacha20poly1305::{ ChaCha20Poly1305, }; use digest::Digest; +use prost::bytes::BytesMut; use rand::{rngs::OsRng, RngCore}; -use tari_comms::types::{CommsPublicKey, CommsSecretKey}; +use tari_comms::{ + message::MessageExt, + types::{CommsPublicKey, CommsSecretKey}, + BufMut, +}; use tari_crypto::{ keys::DiffieHellmanSharedSecret, tari_utilities::{epoch_time::EpochTime, ByteArray}, @@ -69,31 +74,39 @@ pub fn generate_ecdh_secret(secret_key: &CommsSecretKey, public_key: &CommsPubli output } -fn pad_message_to_base_length_multiple(message: &[u8]) -> Result, DhtEncryptError> { - // We require a 32-bit length representation, and also don't want to overflow after including this encoding - if message.len() > ((u32::max_value() - (size_of::() as u32)) as usize) { - return Err(DhtEncryptError::PaddingError("Message is too long".to_string())); +fn get_message_padding_length(message_length: usize) -> usize { + if message_length == 0 { + return MESSAGE_BASE_LENGTH; } - let message_length = message.len(); - let encoded_length = (message_length as u32).to_le_bytes(); - // Pad the message (if needed) to the next multiple of the base length - let padding_length = if ((message_length + size_of::()) % MESSAGE_BASE_LENGTH) == 0 { + if message_length % MESSAGE_BASE_LENGTH == 0 { 0 } else { - MESSAGE_BASE_LENGTH - ((message_length + size_of::()) % MESSAGE_BASE_LENGTH) - }; - - // The padded message is the encoded length, message, and zero padding - let mut padded_message = Vec::with_capacity(size_of::() + message_length + padding_length); - padded_message.extend_from_slice(&encoded_length); - padded_message.extend_from_slice(message); - padded_message.extend(std::iter::repeat(0u8).take(padding_length)); + MESSAGE_BASE_LENGTH - (message_length % MESSAGE_BASE_LENGTH) + } +} - Ok(padded_message) +/// Pads a message to a multiple of MESSAGE_BASE_LENGTH excluding the additional prefix space +fn pad_message_to_base_length_multiple( + message: &mut BytesMut, + additional_prefix_space: usize, +) -> Result<(), DhtEncryptError> { + // We require a 32-bit length representation, and also don't want to overflow after including this encoding + if message.len() > u32::MAX as usize { + return Err(DhtEncryptError::PaddingError("Message is too long".to_string())); + } + let padding_length = + get_message_padding_length(message.len().checked_sub(additional_prefix_space).ok_or_else(|| { + DhtEncryptError::PaddingError("Message length shorter than the additional_prefix_space".to_string()) + })?); + message.reserve(message.len() + padding_length); + message.extend(iter::repeat(0u8).take(padding_length)); + + Ok(()) } -fn get_original_message_from_padded_text(padded_message: &[u8]) -> Result, DhtEncryptError> { +/// Returns the unpadded message. The messages must have the length prefixed to it and the nonce is removec. +fn get_original_message_from_padded_text(padded_message: &mut BytesMut) -> Result<(), DhtEncryptError> { // NOTE: This function can return errors relating to message length // It is important not to leak error types to an adversary, or to have timing differences @@ -112,25 +125,22 @@ fn get_original_message_from_padded_text(padded_message: &[u8]) -> Result()); let mut encoded_length = [0u8; size_of::()]; - encoded_length.copy_from_slice(&padded_message[0..size_of::()]); + encoded_length.copy_from_slice(&len[..]); let message_length = u32::from_le_bytes(encoded_length) as usize; // The padded message is too short for the decoded length - let end = message_length - .checked_add(size_of::()) - .ok_or_else(|| DhtEncryptError::PaddingError("Claimed unpadded message length is too large".to_string()))?; - if end > padded_message.len() { + if message_length > padded_message.len() { return Err(DhtEncryptError::CipherError( "Claimed unpadded message length is too large".to_string(), )); } // Remove the padding (we don't check for valid padding, as this is offloaded to authentication) - let start = size_of::(); - let unpadded_message = &padded_message[start..end]; + padded_message.truncate(message_length); - Ok(unpadded_message.to_vec()) + Ok(()) } pub fn generate_key_message(data: &[u8]) -> CipherKey { @@ -150,21 +160,20 @@ pub fn generate_key_signature_for_authenticated_encryption(data: &[u8]) -> Authe } /// Decrypts cipher text using ChaCha20 stream cipher given the cipher key and cipher text with integral nonce. -pub fn decrypt(cipher_key: &CipherKey, cipher_text: &[u8]) -> Result, DhtEncryptError> { +pub fn decrypt(cipher_key: &CipherKey, cipher_text: &mut BytesMut) -> Result<(), DhtEncryptError> { if cipher_text.len() < size_of::() { return Err(DhtEncryptError::InvalidDecryptionNonceNotIncluded); } - let (nonce, cipher_text) = cipher_text.split_at(size_of::()); - let nonce = Nonce::from_slice(nonce); - let mut cipher_text = cipher_text.to_vec(); + let nonce = cipher_text.split_to(size_of::()); + let nonce = Nonce::from_slice(&nonce); let mut cipher = ChaCha20::new(&cipher_key.0, nonce); - cipher.apply_keystream(cipher_text.as_mut_slice()); + cipher.apply_keystream(cipher_text); // get original message, from decrypted padded cipher text - let cipher_text = get_original_message_from_padded_text(cipher_text.as_slice())?; - Ok(cipher_text) + get_original_message_from_padded_text(cipher_text)?; + Ok(()) } pub fn decrypt_with_chacha20_poly1305( @@ -183,23 +192,49 @@ pub fn decrypt_with_chacha20_poly1305( Ok(decrypted_signature) } -/// Encrypt the plain text using the ChaCha20 stream cipher -pub fn encrypt(cipher_key: &CipherKey, plain_text: &[u8]) -> Result, DhtEncryptError> { - // pad plain_text to avoid message length leaks - let plain_text = pad_message_to_base_length_multiple(plain_text)?; - +/// Encrypt the plain text using the ChaCha20 stream cipher. The message is assumed to have a 32-bit length prepended +/// onto it. +pub fn encrypt(cipher_key: &CipherKey, plain_text: &mut BytesMut) -> Result<(), DhtEncryptError> { + if plain_text.len() < size_of::() { + return Err(DhtEncryptError::PaddingError( + "Message is not long enough to include a nonce".to_string(), + )); + } + // add nonce let mut nonce = [0u8; size_of::()]; OsRng.fill_bytes(&mut nonce); + let nonce = Nonce::from(nonce); + plain_text[..size_of::()].copy_from_slice(&nonce[..]); + + // pad plain_text to avoid message length leaks + // Excludes the nonce in the padded message length - this is mostly for backwards compatibility + pad_message_to_base_length_multiple(plain_text, size_of::())?; - let nonce_ga = Nonce::from_slice(&nonce); - let mut cipher = ChaCha20::new(&cipher_key.0, nonce_ga); + let mut cipher = ChaCha20::new(&cipher_key.0, &nonce); - let mut buf = vec![0u8; plain_text.len() + nonce.len()]; - buf[..nonce.len()].copy_from_slice(&nonce[..]); + cipher.apply_keystream(&mut plain_text[size_of::()..]); + Ok(()) +} - buf[nonce.len()..].copy_from_slice(plain_text.as_slice()); - cipher.apply_keystream(&mut buf[nonce.len()..]); - Ok(buf) +/// Encodes a prost Message, efficiently prepending the little-endian 32-bit length to the encoding +fn encode_with_prepended_length(msg: &T, additional_prefix_space: usize) -> BytesMut { + let len = msg.encoded_len(); + let mut buf = BytesMut::with_capacity(size_of::() + additional_prefix_space + len); + buf.extend(iter::repeat(0).take(additional_prefix_space)); + buf.put_u32_le(len as u32); + msg.encode(&mut buf).expect( + "prost::Message::encode documentation says it is infallible unless the buffer has insufficient capacity. This \ + buffer's capacity was set with encoded_len", + ); + buf +} + +pub fn prepare_message(is_encrypted: bool, message: &T) -> BytesMut { + if is_encrypted { + encode_with_prepended_length(message, size_of::()) + } else { + message.encode_into_bytes_mut() + } } /// Produces authenticated encryption of the signature using the ChaCha20-Poly1305 stream cipher, @@ -276,6 +311,8 @@ pub fn create_message_domain_separated_hash_parts( #[cfg(test)] mod test { + use prost::Message; + use tari_comms::message::MessageExt; use tari_crypto::keys::PublicKey; use tari_utilities::hex::from_hex; @@ -285,10 +322,11 @@ mod test { fn encrypt_decrypt() { let pk = CommsPublicKey::default(); let key = CipherKey(*chacha20::Key::from_slice(pk.as_bytes())); - let plain_text = "Last enemy position 0830h AJ 9863".as_bytes().to_vec(); - let encrypted = encrypt(&key, &plain_text).unwrap(); - let decrypted = decrypt(&key, &encrypted).unwrap(); - assert_eq!(decrypted, plain_text); + let plain_text = "Last enemy position 0830h AJ 9863".to_string(); + let mut msg = prepare_message(true, &plain_text); + encrypt(&key, &mut msg).unwrap(); + decrypt(&key, &mut msg).unwrap(); + assert_eq!(String::decode(&msg[..]).unwrap(), plain_text); } #[test] @@ -299,9 +337,11 @@ mod test { "", ) .unwrap(); - let plain_text = decrypt(&key, &cipher_text).unwrap(); + + let mut text = BytesMut::from(&cipher_text[..]); + decrypt(&key, &mut text).unwrap(); let secret_msg = "Last enemy position 0830h AJ 9863".as_bytes().to_vec(); - assert_eq!(plain_text, secret_msg); + assert_eq!(text, secret_msg); } #[test] @@ -399,106 +439,86 @@ mod test { #[test] fn pad_message_correctness() { // test for small message - let message = &[0u8, 10, 22, 11, 38, 74, 59, 91, 73, 82, 75, 23, 59]; - let prepend_message = (message.len() as u32).to_le_bytes(); - let pad = std::iter::repeat(0u8) - .take(MESSAGE_BASE_LENGTH - message.len() - prepend_message.len()) + let message = [0u8, 10, 22, 11, 38, 74, 59, 91, 73, 82, 75, 23, 59].as_slice(); + let pad = iter::repeat(0u8) + .take(MESSAGE_BASE_LENGTH - message.len()) .collect::>(); - let pad_message = pad_message_to_base_length_multiple(message).unwrap(); + let mut pad_message = BytesMut::from(message); + pad_message_to_base_length_multiple(&mut pad_message, 0).unwrap(); // padded message is of correct length assert_eq!(pad_message.len(), MESSAGE_BASE_LENGTH); - // prepend message is well specified - assert_eq!(prepend_message, pad_message[..prepend_message.len()]); // message body is well specified - assert_eq!( - *message, - pad_message[prepend_message.len()..prepend_message.len() + message.len()] - ); + assert_eq!(*message, pad_message[..message.len()]); // pad is well specified - assert_eq!(pad, pad_message[prepend_message.len() + message.len()..]); + assert_eq!(pad, pad_message[message.len()..]); // test for large message - let message = &[100u8; MESSAGE_BASE_LENGTH * 8 - 100]; - let prepend_message = (message.len() as u32).to_le_bytes(); - let pad_message = pad_message_to_base_length_multiple(message).unwrap(); - let pad = std::iter::repeat(0u8) - .take((8 * MESSAGE_BASE_LENGTH) - message.len() - prepend_message.len()) + let message = encode_with_prepended_length(&vec![100u8; MESSAGE_BASE_LENGTH * 8 - 100], 0); + let mut pad_message = message.clone(); + pad_message_to_base_length_multiple(&mut pad_message, 0).unwrap(); + let pad = iter::repeat(0u8) + .take((8 * MESSAGE_BASE_LENGTH) - message.len()) .collect::>(); // padded message is of correct length assert_eq!(pad_message.len(), 8 * MESSAGE_BASE_LENGTH); - // prepend message is well specified - assert_eq!(prepend_message, pad_message[..prepend_message.len()]); // message body is well specified - assert_eq!( - *message, - pad_message[prepend_message.len()..prepend_message.len() + message.len()] - ); + assert_eq!(*message, pad_message[..message.len()]); // pad is well specified - assert_eq!(pad, pad_message[prepend_message.len() + message.len()..]); + assert_eq!(pad, pad_message[message.len()..]); // test for base message of multiple base length - let message = &[100u8; MESSAGE_BASE_LENGTH * 9 - 123]; - let prepend_message = (message.len() as u32).to_le_bytes(); + let message = encode_with_prepended_length(&vec![100u8; MESSAGE_BASE_LENGTH * 9 - 123], 0); let pad = std::iter::repeat(0u8) - .take((9 * MESSAGE_BASE_LENGTH) - message.len() - prepend_message.len()) + .take((9 * MESSAGE_BASE_LENGTH) - message.len()) .collect::>(); - let pad_message = pad_message_to_base_length_multiple(message).unwrap(); + let mut pad_message = message.clone(); + pad_message_to_base_length_multiple(&mut pad_message, 0).unwrap(); // padded message is of correct length assert_eq!(pad_message.len(), 9 * MESSAGE_BASE_LENGTH); - // prepend message is well specified - assert_eq!(prepend_message, pad_message[..prepend_message.len()]); // message body is well specified - assert_eq!( - *message, - pad_message[prepend_message.len()..prepend_message.len() + message.len()] - ); + assert_eq!(*message, pad_message[..message.len()]); // pad is well specified - assert_eq!(pad, pad_message[prepend_message.len() + message.len()..]); + assert_eq!(pad, pad_message[message.len()..]); // test for empty message - let message: [u8; 0] = []; - let prepend_message = (message.len() as u32).to_le_bytes(); - let pad_message = pad_message_to_base_length_multiple(&message).unwrap(); + let message = encode_with_prepended_length(&vec![], 0); + let mut pad_message = message.clone(); + pad_message_to_base_length_multiple(&mut pad_message, 0).unwrap(); let pad = [0u8; MESSAGE_BASE_LENGTH - 4]; // padded message is of correct length assert_eq!(pad_message.len(), MESSAGE_BASE_LENGTH); - // prepend message is well specified - assert_eq!(prepend_message, pad_message[..prepend_message.len()]); // message body is well specified - assert_eq!( - message, - pad_message[prepend_message.len()..prepend_message.len() + message.len()] - ); + assert_eq!(message, pad_message[..message.len()]); // pad is well specified - assert_eq!(pad, pad_message[prepend_message.len() + message.len()..]); + assert_eq!(pad, pad_message[message.len()..]); } #[test] fn unpadding_failure_modes() { // The padded message is empty - let message: [u8; 0] = []; - assert!(get_original_message_from_padded_text(&message) + let mut message = BytesMut::new(); + assert!(get_original_message_from_padded_text(&mut message) .unwrap_err() .to_string() .contains("Padded message is not long enough for length extraction")); // We cannot extract the message length - let message = [0u8; size_of::() - 1]; - assert!(get_original_message_from_padded_text(&message) + let mut message = BytesMut::from([0u8; size_of::() - 1].as_slice()); + assert!(get_original_message_from_padded_text(&mut message) .unwrap_err() .to_string() .contains("Padded message is not long enough for length extraction")); // The padded message is not a multiple of the base length - let message = [0u8; 2 * MESSAGE_BASE_LENGTH + 1]; - assert!(get_original_message_from_padded_text(&message) + let mut message = BytesMut::from([0u8; 2 * MESSAGE_BASE_LENGTH + 1].as_slice()); + assert!(get_original_message_from_padded_text(&mut message) .unwrap_err() .to_string() .contains("Padded message must be a multiple of the base length")); @@ -508,44 +528,56 @@ mod test { fn get_original_message_from_padded_text_successful() { // test for short message let message = vec![0u8, 10, 22, 11, 38, 74, 59, 91, 73, 82, 75, 23, 59]; - let pad_message = pad_message_to_base_length_multiple(message.as_slice()).unwrap(); + let mut pad_message = encode_with_prepended_length(&message, 0); + pad_message_to_base_length_multiple(&mut pad_message, 0).unwrap(); - let output_message = get_original_message_from_padded_text(pad_message.as_slice()).unwrap(); - assert_eq!(message, output_message); + // + let mut output_message = pad_message.clone(); + get_original_message_from_padded_text(&mut output_message).unwrap(); + assert_eq!(message.to_encoded_bytes(), output_message); // test for large message let message = vec![100u8; 1024]; - let pad_message = pad_message_to_base_length_multiple(message.as_slice()).unwrap(); + let mut pad_message = encode_with_prepended_length(&message, 0); + pad_message_to_base_length_multiple(&mut pad_message, 0).unwrap(); - let output_message = get_original_message_from_padded_text(pad_message.as_slice()).unwrap(); - assert_eq!(message, output_message); + let mut output_message = pad_message.clone(); + get_original_message_from_padded_text(&mut output_message).unwrap(); + assert_eq!(message.to_encoded_bytes(), output_message); // test for base message of base length let message = vec![100u8; 984]; - let pad_message = pad_message_to_base_length_multiple(message.as_slice()).unwrap(); + let mut pad_message = encode_with_prepended_length(&message, 0); + pad_message_to_base_length_multiple(&mut pad_message, 0).unwrap(); - let output_message = get_original_message_from_padded_text(pad_message.as_slice()).unwrap(); - assert_eq!(message, output_message); + let mut output_message = pad_message.clone(); + get_original_message_from_padded_text(&mut output_message).unwrap(); + assert_eq!(message.to_encoded_bytes(), output_message); // test for empty message let message: Vec = vec![]; - let pad_message = pad_message_to_base_length_multiple(message.as_slice()).unwrap(); + let mut pad_message = encode_with_prepended_length(&message, 0); + pad_message_to_base_length_multiple(&mut pad_message, 0).unwrap(); - let output_message = get_original_message_from_padded_text(pad_message.as_slice()).unwrap(); - assert_eq!(message, output_message); + let mut output_message = pad_message.clone(); + get_original_message_from_padded_text(&mut output_message).unwrap(); + assert_eq!(message.to_encoded_bytes(), output_message); } #[test] fn padding_fails_if_pad_message_prepend_length_is_bigger_than_plaintext_length() { - let message = "This is my secret message, keep it secret !".as_bytes(); - let mut pad_message = pad_message_to_base_length_multiple(message).unwrap(); + let message = "This is my secret message, keep it secret !".as_bytes().to_vec(); + let mut pad_message = encode_with_prepended_length(&message, 0); + pad_message_to_base_length_multiple(&mut pad_message, 0).unwrap(); + let mut pad_message = pad_message.to_vec(); // we modify the prepend length, in order to assert that the get original message // method will output a different length message pad_message[0] = 1; - let modified_message = get_original_message_from_padded_text(pad_message.as_slice()).unwrap(); - assert!(message.len() != modified_message.len()); + let mut modified_message = BytesMut::from(pad_message.as_slice()); + get_original_message_from_padded_text(&mut modified_message).unwrap(); + assert_ne!(message.len(), modified_message.len()); // add big number from le bytes of prepend bytes pad_message[0] = 255; @@ -553,7 +585,8 @@ mod test { pad_message[2] = 255; pad_message[3] = 255; - assert!(get_original_message_from_padded_text(pad_message.as_slice()) + let mut pad_message = BytesMut::from(pad_message.as_slice()); + assert!(get_original_message_from_padded_text(&mut pad_message) .unwrap_err() .to_string() .contains("Claimed unpadded message length is too large")); @@ -565,24 +598,30 @@ mod test { // in any way the value of the decrypted content, by applying a cipher stream let pk = CommsPublicKey::default(); let key = CipherKey(*chacha20::Key::from_slice(pk.as_bytes())); - let message = "My secret message, keep it secret !".as_bytes().to_vec(); - let mut encrypted = encrypt(&key, &message).unwrap(); + let message = "My secret message, keep it secret !".to_string(); + let mut msg = encode_with_prepended_length(&message, size_of::()); + encrypt(&key, &mut msg).unwrap(); - let n = encrypted.len(); - encrypted[n - 1] += 1; + let n = msg.len(); + msg[n - 1] += 1; - assert!(decrypt(&key, &encrypted).unwrap() == message); + decrypt(&key, &mut msg).unwrap(); + assert_eq!(String::decode(&msg[..]).unwrap(), message); } #[test] fn decryption_fails_if_message_body_is_modified() { let pk = CommsPublicKey::default(); let key = CipherKey(*chacha20::Key::from_slice(pk.as_bytes())); - let message = "My secret message, keep it secret !".as_bytes().to_vec(); - let mut encrypted = encrypt(&key, &message).unwrap(); + let message = "My secret message, keep it secret !".to_string(); + let mut msg = encode_with_prepended_length(&message, size_of::()); + encrypt(&key, &mut msg).unwrap(); - encrypted[size_of::() + size_of::() + 1] += 1; + msg[size_of::() + size_of::() + 1] += 1; - assert!(decrypt(&key, &encrypted).unwrap() != message); + // TODO: decryption does not "fail" is this intended? + decrypt(&key, &mut msg).unwrap(); + eprintln!("msg = {:?}", msg); + assert_ne!(msg, message); } } diff --git a/comms/dht/src/dedup/mod.rs b/comms/dht/src/dedup/mod.rs index fa7cf3f20e..f49e48a4a4 100644 --- a/comms/dht/src/dedup/mod.rs +++ b/comms/dht/src/dedup/mod.rs @@ -197,7 +197,7 @@ mod test { assert!(dedup.poll_ready(&mut cx).is_ready()); let node_identity = make_node_identity(); let inbound_message = - make_dht_inbound_message(&node_identity, vec![], DhtMessageFlags::empty(), false, false).unwrap(); + make_dht_inbound_message(&node_identity, &vec![], DhtMessageFlags::empty(), false, false).unwrap(); let decrypted_msg = DecryptedDhtMessage::succeeded(wrap_in_envelope_body!(vec![]), None, inbound_message); rt.block_on(dedup.call(decrypted_msg.clone())).unwrap(); @@ -213,12 +213,12 @@ mod test { #[test] fn deterministic_hash() { const TEST_MSG: &[u8] = b"test123"; - const EXPECTED_HASH: &str = "d6333668f259f677703fbe4e89152ee41c7c01f6dec502befc63120246523ffe"; + const EXPECTED_HASH: &str = "1c2bb1bcff443af4441b789bd1d6984bb8d7bed2c9f85e8cf4f45615fdd9e47d"; let node_identity = make_node_identity(); let dht_message = make_dht_inbound_message( &node_identity, - TEST_MSG.to_vec(), + &TEST_MSG.to_vec(), DhtMessageFlags::empty(), false, false, @@ -229,7 +229,7 @@ mod test { let node_identity = make_node_identity(); let dht_message = make_dht_inbound_message( &node_identity, - TEST_MSG.to_vec(), + &TEST_MSG.to_vec(), DhtMessageFlags::empty(), false, false, diff --git a/comms/dht/src/dht.rs b/comms/dht/src/dht.rs index 5361665f42..e3acdae98a 100644 --- a/comms/dht/src/dht.rs +++ b/comms/dht/src/dht.rs @@ -494,7 +494,7 @@ mod test { let msg = wrap_in_envelope_body!(b"secret".to_vec()); let dht_envelope = make_dht_envelope( &node_identity, - msg.to_encoded_bytes(), + &msg, DhtMessageFlags::empty(), false, MessageTag::new(), @@ -546,7 +546,7 @@ mod test { // Encrypt for self let dht_envelope = make_dht_envelope( &node_identity, - msg.to_encoded_bytes(), + &msg, DhtMessageFlags::ENCRYPTED, true, MessageTag::new(), @@ -602,10 +602,11 @@ mod test { let node_identity2 = make_node_identity(); let ecdh_key = crypt::generate_ecdh_secret(node_identity2.secret_key(), node_identity2.public_key()); let key_message = crypt::generate_key_message(&ecdh_key); - let encrypted_bytes = crypt::encrypt(&key_message, &msg.to_encoded_bytes()).unwrap(); + let mut encrypted_bytes = msg.encode_into_bytes_mut(); + crypt::encrypt(&key_message, &mut encrypted_bytes).unwrap(); let dht_envelope = make_dht_envelope( &node_identity2, - encrypted_bytes, + &encrypted_bytes.to_vec(), DhtMessageFlags::ENCRYPTED, true, MessageTag::new(), @@ -667,7 +668,7 @@ mod test { let msg = wrap_in_envelope_body!(b"secret".to_vec()); let mut dht_envelope = make_dht_envelope( &node_identity, - msg.to_encoded_bytes(), + &msg, DhtMessageFlags::empty(), false, MessageTag::new(), diff --git a/comms/dht/src/envelope.rs b/comms/dht/src/envelope.rs index 27038803af..3f4f2ef06e 100644 --- a/comms/dht/src/envelope.rs +++ b/comms/dht/src/envelope.rs @@ -28,7 +28,6 @@ use std::{ }; use bitflags::bitflags; -use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, Utc}; use prost_types::Timestamp; use serde::{Deserialize, Serialize}; @@ -249,10 +248,10 @@ impl From for DhtHeader { } impl DhtEnvelope { - pub fn new(header: DhtHeader, body: &Bytes) -> Self { + pub fn new(header: DhtHeader, body: Vec) -> Self { Self { header: Some(header), - body: body.to_vec(), + body, } } } diff --git a/comms/dht/src/inbound/decryption.rs b/comms/dht/src/inbound/decryption.rs index 3c9c9e634c..419baf00c0 100644 --- a/comms/dht/src/inbound/decryption.rs +++ b/comms/dht/src/inbound/decryption.rs @@ -30,6 +30,7 @@ use tari_comms::{ message::EnvelopeBody, peer_manager::NodeIdentity, pipeline::PipelineError, + BytesMut, }; use thiserror::Error; use tower::{layer::Layer, Service, ServiceExt}; @@ -406,11 +407,11 @@ where S: Service message_body: &[u8], ) -> Result { let key_message = crypt::generate_key_message(shared_secret); - let decrypted = - crypt::decrypt(&key_message, message_body).map_err(DecryptionError::DecryptionFailedMalformedCipher)?; + let mut decrypted = BytesMut::from(message_body); + crypt::decrypt(&key_message, &mut decrypted).map_err(DecryptionError::DecryptionFailedMalformedCipher)?; // Deserialization into an EnvelopeBody is done here to determine if the // decryption produced valid bytes or not. - EnvelopeBody::decode(decrypted.as_slice()) + EnvelopeBody::decode(decrypted.freeze()) .and_then(|body| { // Check if we received a body length of zero // @@ -477,10 +478,11 @@ mod test { use futures::{executor::block_on, future}; use tari_comms::{ - message::{MessageExt, MessageTag}, + message::MessageTag, runtime, test_utils::mocks::create_connectivity_mock, wrap_in_envelope_body, + BytesMut, }; use tari_test_utils::{counter_context, unpack_enum}; use tokio::time::sleep; @@ -492,6 +494,7 @@ mod test { test_utils::{ make_dht_header, make_dht_inbound_message, + make_dht_inbound_message_raw, make_keypair, make_node_identity, make_valid_message_signature, @@ -527,14 +530,8 @@ mod test { let mut service = DecryptionService::new(Default::default(), node_identity.clone(), connectivity, service); let plain_text_msg = wrap_in_envelope_body!(b"Secret plans".to_vec()); - let inbound_msg = make_dht_inbound_message( - &node_identity, - plain_text_msg.to_encoded_bytes(), - DhtMessageFlags::ENCRYPTED, - true, - true, - ) - .unwrap(); + let inbound_msg = + make_dht_inbound_message(&node_identity, &plain_text_msg, DhtMessageFlags::ENCRYPTED, true, true).unwrap(); block_on(service.call(inbound_msg)).unwrap(); let decrypted = result.lock().unwrap().take().unwrap(); @@ -560,7 +557,7 @@ mod test { let some_other_node_identity = make_node_identity(); let inbound_msg = make_dht_inbound_message( &some_other_node_identity, - some_secret, + &some_secret, DhtMessageFlags::ENCRYPTED, true, true, @@ -591,7 +588,7 @@ mod test { let nonsense = b"Cannot Decrypt this".to_vec(); let inbound_msg = - make_dht_inbound_message(&node_identity, nonsense.clone(), DhtMessageFlags::ENCRYPTED, true, true).unwrap(); + make_dht_inbound_message_raw(&node_identity, nonsense, DhtMessageFlags::ENCRYPTED, true, true).unwrap(); let err = service.call(inbound_msg).await.unwrap_err(); let err = err.downcast::().unwrap(); @@ -615,14 +612,8 @@ mod test { let mut service = DecryptionService::new(Default::default(), node_identity.clone(), connectivity, service); let plain_text_msg = b"Secret message to nowhere".to_vec(); - let inbound_msg = make_dht_inbound_message( - &node_identity, - plain_text_msg.to_encoded_bytes(), - DhtMessageFlags::ENCRYPTED, - true, - false, - ) - .unwrap(); + let inbound_msg = + make_dht_inbound_message(&node_identity, &plain_text_msg, DhtMessageFlags::ENCRYPTED, true, false).unwrap(); let err = service.call(inbound_msg).await.unwrap_err(); let err = err.downcast::().unwrap(); @@ -645,13 +636,15 @@ mod test { let node_identity = make_node_identity(); let mut service = DecryptionService::new(Default::default(), node_identity.clone(), connectivity, service); - let plain_text_msg = b"Secret message".to_vec(); + let plain_text_msg = BytesMut::from(b"Secret message".as_slice()); let (e_secret_key, e_public_key) = make_keypair(); let shared_secret = crypt::generate_ecdh_secret(&e_secret_key, node_identity.public_key()); let key_message = crypt::generate_key_message(&shared_secret); let msg_tag = MessageTag::new(); - let message = crypt::encrypt(&key_message, &plain_text_msg).unwrap(); + let mut message = plain_text_msg.clone(); + crypt::encrypt(&key_message, &mut message).unwrap(); + let message = message.freeze(); let header = make_dht_header( &node_identity, &e_public_key, @@ -663,7 +656,7 @@ mod test { true, ) .unwrap(); - let envelope = DhtEnvelope::new(header.into(), &message.into()); + let envelope = DhtEnvelope::new(header.into(), message.into()); let msg_tag = MessageTag::new(); let mut inbound_msg = DhtInboundMessage::new( msg_tag, @@ -706,13 +699,15 @@ mod test { let node_identity = make_node_identity(); let mut service = DecryptionService::new(Default::default(), node_identity.clone(), connectivity, service); - let plain_text_msg = b"Public message".to_vec(); + let plain_text_msg = BytesMut::from(b"Public message".as_slice()); let (e_secret_key, e_public_key) = make_keypair(); let shared_secret = crypt::generate_ecdh_secret(&e_secret_key, node_identity.public_key()); let key_message = crypt::generate_key_message(&shared_secret); let msg_tag = MessageTag::new(); - let message = crypt::encrypt(&key_message, &plain_text_msg).unwrap(); + let mut message = plain_text_msg.clone(); + crypt::encrypt(&key_message, &mut message).unwrap(); + let message = message.freeze(); let header = make_dht_header( &node_identity, &e_public_key, @@ -724,7 +719,7 @@ mod test { true, ) .unwrap(); - let envelope = DhtEnvelope::new(header.into(), &message.into()); + let envelope = DhtEnvelope::new(header.into(), message.into()); let msg_tag = MessageTag::new(); let mut inbound_msg = DhtInboundMessage::new( msg_tag, diff --git a/comms/dht/src/inbound/deserialize.rs b/comms/dht/src/inbound/deserialize.rs index 7f50c7317a..23537899ff 100644 --- a/comms/dht/src/inbound/deserialize.rs +++ b/comms/dht/src/inbound/deserialize.rs @@ -161,7 +161,7 @@ mod test { let dht_envelope = make_dht_envelope( &node_identity, - b"A".to_vec(), + &b"A".to_vec(), DhtMessageFlags::empty(), false, MessageTag::new(), @@ -181,7 +181,7 @@ mod test { .unwrap(); let msg = spy.pop_request().unwrap(); - assert_eq!(msg.body, b"A".to_vec()); + assert_eq!(msg.body, b"A".to_vec().to_encoded_bytes()); assert_eq!(msg.dht_header, dht_envelope.header.unwrap().try_into().unwrap()); } } diff --git a/comms/dht/src/inbound/dht_handler/task.rs b/comms/dht/src/inbound/dht_handler/task.rs index 1760b47295..e6ee3c7a5d 100644 --- a/comms/dht/src/inbound/dht_handler/task.rs +++ b/comms/dht/src/inbound/dht_handler/task.rs @@ -234,7 +234,7 @@ where S: Service .with_debug_info("Propagating join message".to_string()) .with_dht_header(dht_header) .finish(), - body.to_encoded_bytes(), + body.encode_into_bytes_mut(), ) .await?; } diff --git a/comms/dht/src/inbound/forward.rs b/comms/dht/src/inbound/forward.rs index 7ddd9e4fa7..e687eff8a1 100644 --- a/comms/dht/src/inbound/forward.rs +++ b/comms/dht/src/inbound/forward.rs @@ -24,7 +24,8 @@ use std::task::Poll; use futures::{future::BoxFuture, task::Context}; use log::*; -use tari_comms::{peer_manager::Peer, pipeline::PipelineError}; +use prost::bytes::BufMut; +use tari_comms::{peer_manager::Peer, pipeline::PipelineError, BytesMut}; use tari_utilities::epoch_time::EpochTime; use tower::{layer::Layer, Service, ServiceExt}; @@ -204,12 +205,11 @@ where S: Service return Ok(()); } } - - let body = decryption_result + let err_body = decryption_result .as_ref() - .err() - .cloned() - .expect("previous check that decryption failed"); + .expect_err("previous check that decryption failed"); + let mut body = BytesMut::with_capacity(err_body.len()); + body.put(err_body.as_slice()); let excluded_peers = vec![source_peer.node_id.clone()]; let dest_node_id = dht_header.destination.to_derived_node_id(); @@ -259,7 +259,7 @@ where S: Service mod test { use std::time::Duration; - use tari_comms::{runtime, runtime::task, wrap_in_envelope_body}; + use tari_comms::{message::MessageExt, runtime, runtime::task, wrap_in_envelope_body}; use tokio::sync::mpsc; use super::*; @@ -278,7 +278,7 @@ mod test { let node_identity = make_node_identity(); let inbound_msg = - make_dht_inbound_message(&node_identity, b"".to_vec(), DhtMessageFlags::empty(), false, false).unwrap(); + make_dht_inbound_message(&node_identity, &b"".to_vec(), DhtMessageFlags::empty(), false, false).unwrap(); let msg = DecryptedDhtMessage::succeeded( wrap_in_envelope_body!(Vec::new()), Some(node_identity.public_key().clone()), @@ -300,7 +300,7 @@ mod test { let sample_body = b"Lorem ipsum"; let inbound_msg = make_dht_inbound_message( &make_node_identity(), - sample_body.to_vec(), + &sample_body.to_vec(), DhtMessageFlags::empty(), false, false, @@ -318,7 +318,7 @@ mod test { let (params, body) = oms_mock_state.pop_call().await.unwrap(); // Header and body are preserved when forwarding - assert_eq!(&body.to_vec(), &sample_body); + assert_eq!(&body.to_vec(), &sample_body.to_vec().to_encoded_bytes()); assert_eq!(params.dht_header.unwrap(), header); } } diff --git a/comms/dht/src/outbound/broadcast.rs b/comms/dht/src/outbound/broadcast.rs index 8999d2fd41..1dd2b49649 100644 --- a/comms/dht/src/outbound/broadcast.rs +++ b/comms/dht/src/outbound/broadcast.rs @@ -22,7 +22,6 @@ use std::{sync::Arc, task::Poll}; -use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{ future, @@ -37,6 +36,8 @@ use tari_comms::{ peer_manager::{NodeId, NodeIdentity, Peer}, pipeline::PipelineError, types::CommsPublicKey, + Bytes, + BytesMut, }; use tari_crypto::{keys::PublicKey, tari_utilities::epoch_time::EpochTime}; use tari_utilities::{hex::Hex, ByteArray}; @@ -238,7 +239,7 @@ where S: Service async fn handle_send_message( &mut self, params: FinalSendMessageParams, - body: Bytes, + body: BytesMut, reply_tx: oneshot::Sender, ) -> Result, DhtOutboundError> { trace!(target: LOG_TARGET, "Send params: {:?}", params); @@ -405,7 +406,7 @@ where S: Service extra_flags: DhtMessageFlags, force_origin: bool, is_broadcast: bool, - body: Bytes, + body: BytesMut, expires: Option>, tag: Option, ) -> Result<(Vec, Vec), DhtOutboundError> { @@ -485,7 +486,7 @@ where S: Service message_type: DhtMessageType, flags: DhtMessageFlags, expires: Option, - body: Bytes, + mut body: BytesMut, ) -> Result { match encryption { OutboundEncryption::EncryptFor(public_key) => { @@ -497,7 +498,8 @@ where S: Service // Generate key message for encryption of message let key_message = crypt::generate_key_message(&shared_ephemeral_secret); // Encrypt the message with the body with key message above - let encrypted_body = crypt::encrypt(&key_message, &body)?; + crypt::encrypt(&key_message, &mut body)?; + let encrypted_body = body.freeze(); // Produce domain separated signature signature let mac_signature = crypt::create_message_domain_separated_hash_parts( @@ -525,7 +527,7 @@ where S: Service Ok(( Some(Arc::new(e_public_key)), Some(encrypted_message_signature.into()), - encrypted_body.into(), + encrypted_body, )) }, OutboundEncryption::ClearText => { @@ -546,9 +548,9 @@ where S: Service &binding_message_representation, ) .to_proto(); - Ok((None, Some(signature.to_encoded_bytes().into()), body)) + Ok((None, Some(signature.to_encoded_bytes().into()), body.freeze())) } else { - Ok((None, None, body)) + Ok((None, None, body.freeze())) } }, } @@ -633,7 +635,7 @@ mod test { service .call(DhtOutboundRequest::SendMessage( Box::new(SendMessageParams::new().flood(vec![]).finish()), - b"custom_msg".to_vec().into(), + b"custom_msg".as_slice().into(), reply_tx, )) .await @@ -680,7 +682,7 @@ mod test { .with_discovery(false) .finish(), ), - Bytes::from_static(b"custom_msg"), + BytesMut::from(b"custom_msg".as_slice()), reply_tx, )) .await @@ -728,7 +730,7 @@ mod test { .with_discovery(true) .finish(), ), - b"custom_msg".to_vec().into(), + b"custom_msg".as_slice().into(), reply_tx, )) .await diff --git a/comms/dht/src/outbound/message.rs b/comms/dht/src/outbound/message.rs index 544287e090..588cbb6929 100644 --- a/comms/dht/src/outbound/message.rs +++ b/comms/dht/src/outbound/message.rs @@ -22,11 +22,12 @@ use std::{fmt, fmt::Display, sync::Arc}; -use bytes::Bytes; use tari_comms::{ message::{MessageTag, MessagingReplyTx}, peer_manager::NodeId, types::CommsPublicKey, + Bytes, + BytesMut, }; use tari_utilities::hex::Hex; use thiserror::Error; @@ -145,7 +146,11 @@ impl SendMessageResponse { #[derive(Debug)] pub enum DhtOutboundRequest { /// Send a message using the given broadcast strategy - SendMessage(Box, Bytes, oneshot::Sender), + SendMessage( + Box, + BytesMut, + oneshot::Sender, + ), } impl fmt::Display for DhtOutboundRequest { diff --git a/comms/dht/src/outbound/mock.rs b/comms/dht/src/outbound/mock.rs index 7d7b58d926..c36640f13c 100644 --- a/comms/dht/src/outbound/mock.rs +++ b/comms/dht/src/outbound/mock.rs @@ -21,15 +21,17 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{ + mem, sync::Arc, time::{Duration, Instant}, }; -use bytes::Bytes; +use chacha20::Nonce; use log::*; use tari_comms::{ message::{MessageTag, MessagingReplyTx}, protocol::messaging::SendFailReason, + BytesMut, }; use tokio::{ sync::{mpsc, oneshot, watch, Mutex, RwLock}, @@ -61,7 +63,7 @@ pub fn create_outbound_service_mock(size: usize) -> (OutboundMessageRequester, O #[derive(Clone)] pub struct OutboundServiceMockState { #[allow(clippy::type_complexity)] - calls: Arc>>, + calls: Arc>>, next_response: Arc>>, notif_sender: Arc>, notif_reciever: watch::Receiver<()>, @@ -121,17 +123,36 @@ impl OutboundServiceMockState { self.next_response.write().await.take() } - pub async fn add_call(&self, req: (FinalSendMessageParams, Bytes)) { + async fn add_call(&self, req: (FinalSendMessageParams, BytesMut)) { self.calls.lock().await.push(req); let _r = self.notif_sender.send(()); } - pub async fn take_calls(&self) -> Vec<(FinalSendMessageParams, Bytes)> { - self.calls.lock().await.drain(..).collect() + pub async fn take_calls(&self) -> Vec<(FinalSendMessageParams, BytesMut)> { + self.calls + .lock() + .await + .drain(..) + .map(|(p, mut b)| { + if p.encryption.is_encrypt() { + // Remove prefix data + (p, b.split_off(mem::size_of::() + mem::size_of::())) + } else { + (p, b) + } + }) + .collect() } - pub async fn pop_call(&self) -> Option<(FinalSendMessageParams, Bytes)> { - self.calls.lock().await.pop() + pub async fn pop_call(&self) -> Option<(FinalSendMessageParams, BytesMut)> { + self.calls.lock().await.pop().map(|(p, mut b)| { + if p.encryption.is_encrypt() { + // Remove prefix data + (p, b.split_off(mem::size_of::() + mem::size_of::())) + } else { + (p, b) + } + }) } pub async fn set_behaviour(&self, behaviour: MockBehaviour) { @@ -232,7 +253,7 @@ impl OutboundServiceMock { async fn add_call( &mut self, params: FinalSendMessageParams, - body: Bytes, + body: BytesMut, ) -> (SendMessageResponse, MessagingReplyTx) { self.mock_state.add_call((params, body)).await; let (inner_reply_tx, inner_reply_rx) = oneshot::channel(); diff --git a/comms/dht/src/outbound/requester.rs b/comms/dht/src/outbound/requester.rs index 0b1e38e9ee..0ac3e2e619 100644 --- a/comms/dht/src/outbound/requester.rs +++ b/comms/dht/src/outbound/requester.rs @@ -21,11 +21,12 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use log::*; -use tari_comms::{message::MessageExt, peer_manager::NodeId, types::CommsPublicKey, wrap_in_envelope_body}; +use tari_comms::{peer_manager::NodeId, types::CommsPublicKey, wrap_in_envelope_body, BytesMut}; use tokio::sync::{mpsc, oneshot}; use super::message::DhtOutboundRequest; use crate::{ + crypt::prepare_message, domain_message::OutboundDomainMessage, envelope::NodeDestination, outbound::{ @@ -259,7 +260,8 @@ impl OutboundMessageRequester { } else { message.to_propagation_header() }; - let body = wrap_in_envelope_body!(header, message.into_inner()).to_encoded_bytes(); + let msg = wrap_in_envelope_body!(header, message.into_inner()); + let body = prepare_message(params.encryption.is_encrypt(), &msg); self.send_raw(params, body).await } @@ -275,7 +277,8 @@ impl OutboundMessageRequester { if cfg!(debug_assertions) { trace!(target: LOG_TARGET, "Send Message: {} {:?}", params, message); } - let body = wrap_in_envelope_body!(message).to_encoded_bytes(); + let msg = wrap_in_envelope_body!(message); + let body = prepare_message(params.encryption.is_encrypt(), &msg); self.send_raw(params, body).await } @@ -291,7 +294,8 @@ impl OutboundMessageRequester { if cfg!(debug_assertions) { trace!(target: LOG_TARGET, "Send Message: {} {:?}", params, message); } - let body = wrap_in_envelope_body!(message).to_encoded_bytes(); + let msg = wrap_in_envelope_body!(message); + let body = prepare_message(params.encryption.is_encrypt(), &msg); self.send_raw_no_wait(params, body).await } @@ -299,11 +303,11 @@ impl OutboundMessageRequester { pub async fn send_raw( &mut self, params: FinalSendMessageParams, - body: Vec, + body: BytesMut, ) -> Result { let (reply_tx, reply_rx) = oneshot::channel(); self.sender - .send(DhtOutboundRequest::SendMessage(Box::new(params), body.into(), reply_tx)) + .send(DhtOutboundRequest::SendMessage(Box::new(params), body, reply_tx)) .await?; reply_rx @@ -315,11 +319,11 @@ impl OutboundMessageRequester { pub async fn send_raw_no_wait( &mut self, params: FinalSendMessageParams, - body: Vec, + body: BytesMut, ) -> Result<(), DhtOutboundError> { let (reply_tx, _) = oneshot::channel(); self.sender - .send(DhtOutboundRequest::SendMessage(Box::new(params), body.into(), reply_tx)) + .send(DhtOutboundRequest::SendMessage(Box::new(params), body, reply_tx)) .await?; Ok(()) } diff --git a/comms/dht/src/outbound/serialize.rs b/comms/dht/src/outbound/serialize.rs index 0d47a64cc5..ff0e8fe745 100644 --- a/comms/dht/src/outbound/serialize.rs +++ b/comms/dht/src/outbound/serialize.rs @@ -97,7 +97,7 @@ where message_tag: tag.as_value(), expires, }); - let envelope = DhtEnvelope::new(dht_header, &body); + let envelope = DhtEnvelope::new(dht_header, body.into()); let body = Bytes::from(envelope.to_encoded_bytes()); diff --git a/comms/dht/src/store_forward/saf_handler/task.rs b/comms/dht/src/store_forward/saf_handler/task.rs index 0aada15e4e..7f5390d382 100644 --- a/comms/dht/src/store_forward/saf_handler/task.rs +++ b/comms/dht/src/store_forward/saf_handler/task.rs @@ -34,6 +34,7 @@ use tari_comms::{ peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerManagerError}, pipeline::PipelineError, types::CommsPublicKey, + BytesMut, }; use tari_utilities::{convert::try_convert_all, ByteArray}; use tokio::sync::mpsc; @@ -563,9 +564,10 @@ where S: Service ); let key_message = crypt::generate_key_message(&shared_secret); - let decrypted_bytes = crypt::decrypt(&key_message, body)?; + let mut decrypted_bytes = BytesMut::from(body); + crypt::decrypt(&key_message, &mut decrypted_bytes)?; let envelope_body = - EnvelopeBody::decode(decrypted_bytes.as_slice()).map_err(|_| StoreAndForwardError::DecryptionFailed)?; + EnvelopeBody::decode(decrypted_bytes.freeze()).map_err(|_| StoreAndForwardError::DecryptionFailed)?; if envelope_body.is_empty() { return Err(StoreAndForwardError::InvalidEnvelopeBody); } @@ -702,7 +704,7 @@ mod test { None, make_dht_inbound_message( &node_identity, - b"Keep this for others please".to_vec(), + &b"Keep this for others please".to_vec(), DhtMessageFlags::ENCRYPTED, true, false, @@ -793,10 +795,9 @@ mod test { sleep(Duration::from_secs(5)).await; } assert_eq!(oms_mock_state.call_count().await, 1); - let call = oms_mock_state.pop_call().await.unwrap(); + let (_, body) = oms_mock_state.pop_call().await.unwrap(); - let body = call.1.to_vec(); - let body = EnvelopeBody::decode(body.as_slice()).unwrap(); + let body = EnvelopeBody::decode(body).unwrap(); let msg = body.decode_part::(0).unwrap().unwrap(); assert_eq!(msg.messages().len(), 1); @@ -827,19 +828,19 @@ mod test { let node_identity = make_node_identity(); - let msg_a = wrap_in_envelope_body!(&b"A".to_vec()).to_encoded_bytes(); + let msg_a = wrap_in_envelope_body!(&b"A".to_vec()); let inbound_msg_a = - make_dht_inbound_message(&node_identity, msg_a.clone(), DhtMessageFlags::ENCRYPTED, true, false).unwrap(); + make_dht_inbound_message(&node_identity, &msg_a, DhtMessageFlags::ENCRYPTED, true, false).unwrap(); // Need to know the peer to process a stored message peer_manager .add_peer(Clone::clone(&*inbound_msg_a.source_peer)) .await .unwrap(); - let msg_b = &wrap_in_envelope_body!(b"B".to_vec()).to_encoded_bytes(); + let msg_b = wrap_in_envelope_body!(b"B".to_vec()); let inbound_msg_b = - make_dht_inbound_message(&node_identity, msg_b.clone(), DhtMessageFlags::ENCRYPTED, true, false).unwrap(); + make_dht_inbound_message(&node_identity, &msg_b, DhtMessageFlags::ENCRYPTED, true, false).unwrap(); // Need to know the peer to process a stored message peer_manager .add_peer(Clone::clone(&*inbound_msg_b.source_peer)) @@ -856,20 +857,14 @@ mod test { let msg2 = ProtoStoredMessage::new(0, inbound_msg_b.dht_header, inbound_msg_b.body, msg2_time); // Cleartext message - let clear_msg = wrap_in_envelope_body!(b"Clear".to_vec()).to_encoded_bytes(); - let clear_header = make_dht_inbound_message( - &node_identity, - clear_msg.clone(), - DhtMessageFlags::empty(), - false, - false, - ) - .unwrap() - .dht_header; + let clear_msg = wrap_in_envelope_body!(b"Clear".to_vec()); + let clear_header = make_dht_inbound_message(&node_identity, &clear_msg, DhtMessageFlags::empty(), false, false) + .unwrap() + .dht_header; let msg_clear_time = Utc::now() .checked_sub_signed(chrono::Duration::from_std(Duration::from_secs(120)).unwrap()) .unwrap(); - let msg_clear = ProtoStoredMessage::new(0, clear_header, clear_msg, msg_clear_time); + let msg_clear = ProtoStoredMessage::new(0, clear_header, clear_msg.to_encoded_bytes(), msg_clear_time); let mut message = DecryptedDhtMessage::succeeded( wrap_in_envelope_body!(StoredMessagesResponse { messages: vec![msg1.clone(), msg2, msg_clear], @@ -879,7 +874,7 @@ mod test { None, make_dht_inbound_message( &node_identity, - b"Stored message".to_vec(), + &b"Stored message".to_vec(), DhtMessageFlags::ENCRYPTED, true, false, @@ -950,9 +945,9 @@ mod test { let node_identity = make_node_identity(); - let msg_a = wrap_in_envelope_body!(&b"A".to_vec()).to_encoded_bytes(); + let msg_a = wrap_in_envelope_body!(&b"A".to_vec()); let inbound_msg_a = - make_dht_inbound_message(&node_identity, msg_a, DhtMessageFlags::ENCRYPTED, true, false).unwrap(); + make_dht_inbound_message(&node_identity, &msg_a, DhtMessageFlags::ENCRYPTED, true, false).unwrap(); peer_manager .add_peer(Clone::clone(&*inbound_msg_a.source_peer)) .await @@ -973,7 +968,7 @@ mod test { None, make_dht_inbound_message( &node_identity, - b"Stored message".to_vec(), + &b"Stored message".to_vec(), DhtMessageFlags::ENCRYPTED, true, false, @@ -1023,9 +1018,9 @@ mod test { let node_identity = make_node_identity(); - let msg_a = wrap_in_envelope_body!(&b"A".to_vec()).to_encoded_bytes(); + let msg_a = wrap_in_envelope_body!(&b"A".to_vec()); let inbound_msg_a = - make_dht_inbound_message(&node_identity, msg_a, DhtMessageFlags::ENCRYPTED, true, false).unwrap(); + make_dht_inbound_message(&node_identity, &msg_a, DhtMessageFlags::ENCRYPTED, true, false).unwrap(); peer_manager .add_peer(Clone::clone(&*inbound_msg_a.source_peer)) .await @@ -1046,7 +1041,7 @@ mod test { None, make_dht_inbound_message( &node_identity, - b"Stored message".to_vec(), + &b"Stored message".to_vec(), DhtMessageFlags::ENCRYPTED, true, false, diff --git a/comms/dht/src/store_forward/store.rs b/comms/dht/src/store_forward/store.rs index 61519cd8ca..c0d2b8d224 100644 --- a/comms/dht/src/store_forward/store.rs +++ b/comms/dht/src/store_forward/store.rs @@ -483,7 +483,7 @@ mod test { let inbound_msg = make_dht_inbound_message( &make_node_identity(), - b"".to_vec(), + &b"".to_vec(), DhtMessageFlags::empty(), false, false, @@ -509,7 +509,7 @@ mod test { let msg_node_identity = make_node_identity(); let inbound_msg = make_dht_inbound_message( &msg_node_identity, - b"This shouldnt be stored".to_vec(), + &b"This shouldnt be stored".to_vec(), DhtMessageFlags::ENCRYPTED, true, false, @@ -539,7 +539,7 @@ mod test { let mut inbound_msg = make_dht_inbound_message( &origin_node_identity, - b"Will you keep this for me?".to_vec(), + &b"Will you keep this for me?".to_vec(), DhtMessageFlags::ENCRYPTED, true, false, @@ -582,7 +582,7 @@ mod test { let mut inbound_msg = make_dht_inbound_message( &origin_node_identity, - b"Will you keep this for me?".to_vec(), + &b"Will you keep this for me?".to_vec(), DhtMessageFlags::ENCRYPTED, true, false, diff --git a/comms/dht/src/test_utils/makers.rs b/comms/dht/src/test_utils/makers.rs index 7646346b3a..c9ba154d98 100644 --- a/comms/dht/src/test_utils/makers.rs +++ b/comms/dht/src/test_utils/makers.rs @@ -36,6 +36,7 @@ use tari_test_utils::{paths::create_temporary_data_path, random}; use crate::{ crypt, + crypt::prepare_message, envelope::{DhtMessageFlags, DhtMessageHeader, NodeDestination}, inbound::DhtInboundMessage, message_signature::MessageSignature, @@ -123,9 +124,9 @@ pub fn make_valid_message_signature(node_identity: &NodeIdentity, message: &[u8] .to_encoded_bytes() } -pub fn make_dht_inbound_message( +pub fn make_dht_inbound_message( node_identity: &NodeIdentity, - body: Vec, + body: &T, flags: DhtMessageFlags, include_origin: bool, include_destination: bool, @@ -148,24 +149,65 @@ pub fn make_dht_inbound_message( )) } +pub fn make_dht_inbound_message_raw( + node_identity: &NodeIdentity, + body: Vec, + flags: DhtMessageFlags, + include_origin: bool, + include_destination: bool, +) -> Result { + let msg_tag = MessageTag::new(); + let (e_secret_key, e_public_key) = make_keypair(); + let header = make_dht_header( + node_identity, + &e_public_key, + &e_secret_key, + &body, + flags, + include_origin, + msg_tag, + include_destination, + )? + .into(); + let envelope = DhtEnvelope::new(header, body); + Ok(DhtInboundMessage::new( + msg_tag, + envelope.header.unwrap().try_into().unwrap(), + Arc::new(Peer::new( + node_identity.public_key().clone(), + node_identity.node_id().clone(), + Vec::::new().into(), + PeerFlags::empty(), + PeerFeatures::COMMUNICATION_NODE, + Default::default(), + Default::default(), + )), + envelope.body, + )) +} + pub fn make_keypair() -> (CommsSecretKey, CommsPublicKey) { CommsPublicKey::random_keypair(&mut OsRng) } -pub fn make_dht_envelope( +pub fn make_dht_envelope( node_identity: &NodeIdentity, - mut message: Vec, + message: &T, flags: DhtMessageFlags, include_origin: bool, trace: MessageTag, include_destination: bool, ) -> Result { let (e_secret_key, e_public_key) = make_keypair(); - if flags.is_encrypted() { + let message = if flags.is_encrypted() { let shared_secret = crypt::generate_ecdh_secret(&e_secret_key, node_identity.public_key()); let key_message = crypt::generate_key_message(&shared_secret); - message = crypt::encrypt(&key_message, &message).unwrap(); - } + let mut message = prepare_message(true, message); + crypt::encrypt(&key_message, &mut message).unwrap(); + message.freeze() + } else { + prepare_message(false, message).freeze() + }; let header = make_dht_header( node_identity, &e_public_key, @@ -177,7 +219,7 @@ pub fn make_dht_envelope( include_destination, )? .into(); - Ok(DhtEnvelope::new(header, &message.into())) + Ok(DhtEnvelope::new(header, message.into())) } pub fn build_peer_manager() -> Arc { diff --git a/comms/dht/src/test_utils/mod.rs b/comms/dht/src/test_utils/mod.rs index 03f531c075..39e8fff377 100644 --- a/comms/dht/src/test_utils/mod.rs +++ b/comms/dht/src/test_utils/mod.rs @@ -45,7 +45,9 @@ pub use dht_actor_mock::{create_dht_actor_mock, DhtMockState}; mod dht_discovery_mock; pub use dht_discovery_mock::{create_dht_discovery_mock, DhtDiscoveryMockState}; +#[cfg(test)] mod makers; +#[cfg(test)] pub use makers::*; mod service; diff --git a/comms/dht/tests/dht.rs b/comms/dht/tests/dht.rs index 9928c1df79..55e9da8e72 100644 --- a/comms/dht/tests/dht.rs +++ b/comms/dht/tests/dht.rs @@ -33,7 +33,6 @@ use tari_comms::{ protocol::messaging::{MessagingEvent, MessagingEventSender, MessagingProtocolExtension}, transports::MemoryTransport, types::CommsDatabase, - wrap_in_envelope_body, CommsBuilder, CommsNode, }; @@ -419,16 +418,13 @@ async fn dht_store_forward() { node_A .dht .outbound_requester() - .send_raw( - params.clone(), - wrap_in_envelope_body!(secret_msg1.to_vec()).to_encoded_bytes(), - ) + .send_message_no_header(params.clone(), secret_msg1.to_vec()) .await .unwrap(); node_A .dht .outbound_requester() - .send_raw(params, wrap_in_envelope_body!(secret_msg2.to_vec()).to_encoded_bytes()) + .send_message_no_header(params, secret_msg2.to_vec()) .await .unwrap(); @@ -722,7 +718,7 @@ async fn dht_do_not_store_invalid_message_in_dedup() { // Get the message that was received by Node B let mut msg = node_B.next_inbound_message(Duration::from_secs(10)).await.unwrap(); - let bytes = msg.decryption_result.unwrap().to_encoded_bytes(); + let bytes = msg.decryption_result.unwrap().encode_into_bytes_mut(); // Clone header without modification let header_unmodified = msg.dht_header.clone(); @@ -972,9 +968,9 @@ async fn dht_propagate_message_contents_not_malleable_ban() { let msg = node_B.next_inbound_message(Duration::from_secs(10)).await.unwrap(); - let mut bytes = msg.decryption_result.unwrap().to_encoded_bytes(); + let mut envelope = msg.decryption_result.unwrap(); // Change the message - bytes.push(0x42); + envelope.push_part([0x42].to_vec()); let mut connectivity_events = node_C.comms.connectivity().get_event_subscription(); @@ -982,7 +978,7 @@ async fn dht_propagate_message_contents_not_malleable_ban() { node_B .dht .outbound_requester() - .send_raw( + .send_message_no_header( SendMessageParams::new() .propagate(node_B.node_identity().public_key().clone().into(), vec![msg .source_peer @@ -990,7 +986,7 @@ async fn dht_propagate_message_contents_not_malleable_ban() { .clone()]) .with_dht_header(msg.dht_header) .finish(), - bytes, + envelope, ) .await .unwrap(); @@ -1016,7 +1012,6 @@ async fn dht_propagate_message_contents_not_malleable_ban() { #[tokio::test] #[allow(non_snake_case)] async fn dht_header_not_malleable() { - env_logger::init(); let node_C = make_node("node_C", PeerFeatures::COMMUNICATION_NODE, dht_config(), None).await; // Node B knows about Node C let mut node_B = make_node( @@ -1081,14 +1076,14 @@ async fn dht_header_not_malleable() { // Modify the header msg.dht_header.message_type = DhtMessageType::from_i32(21i32).unwrap(); - let bytes = msg.decryption_result.unwrap().to_encoded_bytes(); + let envelope = msg.decryption_result.unwrap(); let mut connectivity_events = node_C.comms.connectivity().get_event_subscription(); // Propagate the changed message (to node C) node_B .dht .outbound_requester() - .send_raw( + .send_message_no_header( SendMessageParams::new() .propagate(node_B.node_identity().public_key().clone().into(), vec![msg .source_peer @@ -1096,7 +1091,7 @@ async fn dht_header_not_malleable() { .clone()]) .with_dht_header(msg.dht_header) .finish(), - bytes, + envelope, ) .await .unwrap();