From db7da5b1cef6cf597172ca86f96818d93b3c59f9 Mon Sep 17 00:00:00 2001 From: Stanimal Date: Tue, 1 Oct 2019 14:15:25 +0200 Subject: [PATCH] Inbound DHT middleware This PR adds the vaious inbound DHT middleware components. The module is currently not part of the module tree as there are a number of changes to the comms and tari_middleware crates that need to go along with it. The `tari_comms_dht` crate adds DHT functionality to `tari_comms`. It provides two sets of middleware (_inbound_ and _outbound_) which process incoming requests and outgoing messages respectively. _Attaching to comms_ In `tari_comms`, incoming and outgoing messages are connected using two mpsc sender/receiver pairs. One for incoming messages (receiving `InboundMessage`s) and one for outbound messages (sending `OutboundMessage`s). The DHT module consists of two middleware layers (as in `tower_layer::Layer`) which form an inbound and outbound pipeline for augmenting messages. _Inbound Message Flow_ `InboundMessage`s are received from the incoming comms channel (as in the receiver side of of the mpsc channel which goes into `CommsBuilder::new().incoming_message_sink(sender)`). Typically, a `ServicePipeline` from the `tari_comms_middleware` crate is used to connect a stream from comms to the middleware service. `InboundMessage`(comms) -> _DHT Inbound Middleware_ -> `DhtInboundMessage`(domain) The DHT inbound middleware consist of: * `DeserializeMiddleware` deserializes the body of an `InboundMessage` into a `DhtEnvelope`. * `DecryptionMiddleware` attempts to decrypt the body of a `DhtEnvelope` if required. The result of that decryption (success or failure) is passed to the next service. * `ForwardMiddleware` uses the result of the decryption to determine if the message is destined for this node or not. If not, the message will be forwarded to the applicable peers using the OutboundRequester (i.e. the outbound DHT middleware). * `DhtHandlerMiddleware` handles DHT messages, such as `Join` and `Discover`. If the messages are _not_ DHT messages the `next_service` is called. --- comms/dht/src/inbound/decryption.rs | 225 +++++++++++++++++ comms/dht/src/inbound/deserialize.rs | 156 ++++++++++++ comms/dht/src/inbound/dht_handler/layer.rs | 65 +++++ comms/dht/src/inbound/dht_handler/message.rs | 42 ++++ .../dht/src/inbound/dht_handler/middleware.rs | 84 +++++++ comms/dht/src/inbound/dht_handler/mod.rs | 26 ++ comms/dht/src/inbound/dht_handler/task.rs | 187 ++++++++++++++ comms/dht/src/inbound/error.rs | 38 +++ comms/dht/src/inbound/forward.rs | 237 ++++++++++++++++++ comms/dht/src/inbound/message.rs | 108 ++++++++ comms/dht/src/inbound/mod.rs | 36 +++ 11 files changed, 1204 insertions(+) create mode 100644 comms/dht/src/inbound/decryption.rs create mode 100644 comms/dht/src/inbound/deserialize.rs create mode 100644 comms/dht/src/inbound/dht_handler/layer.rs create mode 100644 comms/dht/src/inbound/dht_handler/message.rs create mode 100644 comms/dht/src/inbound/dht_handler/middleware.rs create mode 100644 comms/dht/src/inbound/dht_handler/mod.rs create mode 100644 comms/dht/src/inbound/dht_handler/task.rs create mode 100644 comms/dht/src/inbound/error.rs create mode 100644 comms/dht/src/inbound/forward.rs create mode 100644 comms/dht/src/inbound/message.rs create mode 100644 comms/dht/src/inbound/mod.rs diff --git a/comms/dht/src/inbound/decryption.rs b/comms/dht/src/inbound/decryption.rs new file mode 100644 index 0000000000..936a478ac8 --- /dev/null +++ b/comms/dht/src/inbound/decryption.rs @@ -0,0 +1,225 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::{ + inbound::message::{DecryptedDhtMessage, DhtInboundMessage}, + message::DhtMessageFlags, +}; +use futures::{task::Context, Future, Poll}; +use log::*; +use std::sync::Arc; +use tari_comms::{message::Message, peer_manager::NodeIdentity, utils::crypt}; +use tari_comms_middleware::MiddlewareError; +use tari_utilities::message_format::MessageFormat; +use tower::{layer::Layer, Service, ServiceExt}; + +const LOG_TARGET: &'static str = "comms::middleware::encryption"; + +/// This layer is responsible for attempting to decrypt inbound messages. +pub struct DecryptionLayer { + node_identity: Arc, +} + +impl DecryptionLayer { + pub fn new(node_identity: Arc) -> Self { + Self { node_identity } + } +} + +impl Layer for DecryptionLayer { + type Service = DecryptionService; + + fn layer(&self, service: S) -> Self::Service { + DecryptionService::new(service, Arc::clone(&self.node_identity)) + } +} + +/// Responsible for decrypting InboundMessages and passing a DecryptedInboundMessage to the given service +#[derive(Clone)] +pub struct DecryptionService { + node_identity: Arc, + inner: S, +} + +impl DecryptionService { + pub fn new(service: S, node_identity: Arc) -> Self { + Self { + inner: service, + node_identity, + } + } +} + +impl Service for DecryptionService +where + S: Service + Clone, + S::Error: Into, +{ + type Error = MiddlewareError; + type Response = (); + + type Future = impl Future>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, msg: DhtInboundMessage) -> Self::Future { + Self::handle_message(self.inner.clone(), Arc::clone(&self.node_identity), msg) + } +} + +impl DecryptionService +where + S: Service, + S::Error: Into, +{ + async fn handle_message( + next_service: S, + node_identity: Arc, + message: DhtInboundMessage, + ) -> Result<(), MiddlewareError> + { + let dht_header = &message.dht_header; + if !dht_header.flags.contains(DhtMessageFlags::ENCRYPTED) { + return Self::success_not_encrypted(next_service, message).await; + } + + debug!(target: LOG_TARGET, "Attempting to decrypt message"); + let shared_secret = crypt::generate_ecdh_secret(&node_identity.secret_key, &dht_header.origin_public_key); + match crypt::decrypt(&shared_secret, &message.body) { + Ok(decrypted) => Self::decryption_succeeded(next_service, message, decrypted).await, + Err(err) => { + debug!(target: LOG_TARGET, "Unable to decrypt message: {}", err); + Self::decryption_failed(next_service, message).await + }, + } + } + + async fn decryption_succeeded( + mut next_service: S, + message: DhtInboundMessage, + decrypted: Vec, + ) -> Result<(), MiddlewareError> + { + next_service.ready().await.map_err(Into::into)?; + // This `Message` was created in the OutboundMessageRequester. Deserialization is done here + // to determine if the decryption produced valid bytes or not. + match Message::from_binary(&decrypted) { + Ok(deserialized) => { + debug!(target: LOG_TARGET, "Message successfully decrypted"); + let msg = DecryptedDhtMessage::succeed(deserialized, message); + next_service.call(msg).await.map_err(Into::into) + }, + Err(err) => { + debug!(target: LOG_TARGET, "Unable to deserialize message: {}", err); + Self::decryption_failed(next_service, message).await + }, + } + } + + async fn success_not_encrypted(mut next_service: S, message: DhtInboundMessage) -> Result<(), MiddlewareError> { + match Message::from_binary(&message.body) { + Ok(deserialized) => { + debug!(target: LOG_TARGET, "Message successfully decrypted"); + let msg = DecryptedDhtMessage::succeed(deserialized, message); + next_service.ready().await.map_err(Into::into)?; + next_service.call(msg).await.map_err(Into::into) + }, + Err(err) => { + debug!(target: LOG_TARGET, "Unable to deserialize message: {}", err); + Self::decryption_failed(next_service, message).await + }, + } + } + + async fn decryption_failed(mut next_service: S, message: DhtInboundMessage) -> Result<(), MiddlewareError> { + let msg = DecryptedDhtMessage::fail(message); + next_service.ready().await.map_err(Into::into)?; + next_service.call(msg).await.map_err(Into::into) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{ + message::DhtMessageFlags, + test_utils::{make_dht_inbound_message, make_node_identity, service_fn}, + }; + use futures::{executor::block_on, future}; + use std::sync::Mutex; + use tari_test_utils::counter_context; + + #[test] + fn poll_ready() { + let inner = service_fn(|_: DecryptedDhtMessage| future::ready(Result::<(), MiddlewareError>::Ok(()))); + let node_identity = make_node_identity(); + let mut service = DecryptionService::new(inner, node_identity); + + counter_context!(cx, counter); + + assert!(service.poll_ready(&mut cx).is_ready()); + + assert_eq!(counter.get(), 0); + } + + #[test] + fn decrypt_inbound_success() { + let result = Mutex::new(None); + let inner = service_fn(|msg: DecryptedDhtMessage| { + *result.lock().unwrap() = Some(msg); + future::ready(Result::<(), MiddlewareError>::Ok(())) + }); + let node_identity = make_node_identity(); + let mut service = DecryptionService::new(inner, Arc::clone(&node_identity)); + + let plain_text_msg = Message::from_message_format((), ()).unwrap(); + let secret_key = crypt::generate_ecdh_secret(&node_identity.secret_key, &node_identity.identity.public_key); + let encrypted = crypt::encrypt(&secret_key, &plain_text_msg.to_binary().unwrap()).unwrap(); + let inbound_msg = make_dht_inbound_message(&node_identity, encrypted, DhtMessageFlags::ENCRYPTED); + + block_on(service.call(inbound_msg)).unwrap(); + let decrypted = result.lock().unwrap().take().unwrap(); + assert_eq!(decrypted.decryption_succeeded(), true); + assert_eq!(decrypted.decryption_result.unwrap(), plain_text_msg); + } + + #[test] + fn decrypt_inbound_fail() { + let result = Mutex::new(None); + let inner = service_fn(|msg: DecryptedDhtMessage| { + *result.lock().unwrap() = Some(msg); + future::ready(Result::<(), MiddlewareError>::Ok(())) + }); + let node_identity = make_node_identity(); + let mut service = DecryptionService::new(inner, Arc::clone(&node_identity)); + + let nonsense = "Cannot Decrypt this".as_bytes().to_vec(); + let inbound_msg = make_dht_inbound_message(&node_identity, nonsense.clone(), DhtMessageFlags::ENCRYPTED); + + block_on(service.call(inbound_msg)).unwrap(); + let decrypted = result.lock().unwrap().take().unwrap(); + assert_eq!(decrypted.decryption_succeeded(), false); + assert_eq!(decrypted.decryption_result.unwrap_err(), nonsense); + } +} diff --git a/comms/dht/src/inbound/deserialize.rs b/comms/dht/src/inbound/deserialize.rs new file mode 100644 index 0000000000..49e47d43ad --- /dev/null +++ b/comms/dht/src/inbound/deserialize.rs @@ -0,0 +1,156 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::{inbound::DhtInboundMessage, message::DhtEnvelope}; +use futures::{task::Context, Future, Poll}; +use log::*; +use tari_comms::message::InboundMessage; +use tari_comms_middleware::{error::box_as_middleware_error, MiddlewareError}; +use tari_utilities::message_format::MessageFormat; +use tower::{layer::Layer, Service, ServiceExt}; + +const LOG_TARGET: &'static str = "comms::dht::deserialize"; + +/// # DHT Deserialization middleware +/// +/// Takes in an `InboundMessage` and deserializes the body into a [DhtEnvelope]. +/// The `next_service` is called with a constructed [DhtInboundMessage] which contains +/// the relevant comms-level and dht-level information. +#[derive(Clone)] +pub struct DhtDeserializeMiddleware { + next_service: S, +} + +impl DhtDeserializeMiddleware { + pub fn new(service: S) -> Self { + Self { next_service: service } + } +} + +impl Service for DhtDeserializeMiddleware +where + S: Service + Clone + 'static, + S::Error: Into, +{ + type Error = MiddlewareError; + type Response = (); + + type Future = impl Future>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, msg: InboundMessage) -> Self::Future { + Self::deserialize(self.next_service.clone(), msg) + } +} + +impl DhtDeserializeMiddleware +where + S: Service, + S::Error: Into, +{ + pub async fn deserialize(mut next_service: S, message: InboundMessage) -> Result<(), MiddlewareError> { + trace!(target: LOG_TARGET, "Deserializing InboundMessage"); + next_service.ready().await.map_err(Into::into)?; + match DhtEnvelope::from_binary(&message.body) { + Ok(dht_envelope) => { + trace!(target: LOG_TARGET, "Deserialization succeeded. Checking signatures"); + if !dht_envelope.is_signature_valid() { + // The origin signature is not valid, this message should never have been sent + warn!( + target: LOG_TARGET, + "SECURITY: Origin signature verification failed. Discarding message from NodeId {}", + message.source_peer.node_id + ); + return Ok(()); + } + + trace!(target: LOG_TARGET, "Origin signature validation passed."); + next_service + .call(DhtInboundMessage::new( + dht_envelope.header, + message.source_peer, + message.envelope_header, + dht_envelope.body, + )) + .await + .map_err(Into::into) + }, + Err(err) => { + error!(target: LOG_TARGET, "DHT deserialization failed: {}", err); + Err(box_as_middleware_error(err)) + }, + } + } +} + +pub struct DeserializeLayer; + +impl DeserializeLayer { + pub fn new() -> Self { + DeserializeLayer + } +} + +impl Layer for DeserializeLayer { + type Service = DhtDeserializeMiddleware; + + fn layer(&self, service: S) -> Self::Service { + DhtDeserializeMiddleware::new(service) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{ + message::DhtMessageFlags, + test_utils::{make_comms_inbound_message, make_dht_envelope, make_node_identity, service_spy}, + }; + use futures::executor::block_on; + use tari_comms::message::MessageFlags; + use tari_test_utils::panic_context; + + #[test] + fn deserialize() { + let spy = service_spy(); + let mut deserialize = DeserializeLayer::new().layer(spy.service::()); + + panic_context!(cx); + + assert!(deserialize.poll_ready(&mut cx).is_ready()); + let node_identity = make_node_identity(); + let dht_envelope = make_dht_envelope(&node_identity, b"A".to_vec(), DhtMessageFlags::empty()); + block_on(deserialize.call(make_comms_inbound_message( + &node_identity, + dht_envelope.to_binary().unwrap(), + MessageFlags::empty(), + ))) + .unwrap(); + + let msg = spy.pop_request().unwrap(); + assert_eq!(msg.body, b"A".to_vec()); + assert_eq!(msg.dht_header, dht_envelope.header); + } +} diff --git a/comms/dht/src/inbound/dht_handler/layer.rs b/comms/dht/src/inbound/dht_handler/layer.rs new file mode 100644 index 0000000000..a401ed0070 --- /dev/null +++ b/comms/dht/src/inbound/dht_handler/layer.rs @@ -0,0 +1,65 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use super::middleware::DhtHandlerMiddleware; +use crate::{config::DhtConfig, outbound::OutboundMessageRequester}; +use std::sync::Arc; +use tari_comms::peer_manager::{NodeIdentity, PeerManager}; +use tower::layer::Layer; + +pub struct DhtHandlerLayer { + config: DhtConfig, + peer_manager: Arc, + node_identity: Arc, + outbound_service: OutboundMessageRequester, +} + +impl DhtHandlerLayer { + pub fn new( + config: DhtConfig, + node_identity: Arc, + peer_manager: Arc, + outbound_service: OutboundMessageRequester, + ) -> Self + { + Self { + config, + node_identity, + peer_manager, + outbound_service, + } + } +} + +impl Layer for DhtHandlerLayer { + type Service = DhtHandlerMiddleware; + + fn layer(&self, service: S) -> Self::Service { + DhtHandlerMiddleware::new( + self.config.clone(), + service, + Arc::clone(&self.node_identity), + Arc::clone(&self.peer_manager), + self.outbound_service.clone(), + ) + } +} diff --git a/comms/dht/src/inbound/dht_handler/message.rs b/comms/dht/src/inbound/dht_handler/message.rs new file mode 100644 index 0000000000..65cb4b7bd9 --- /dev/null +++ b/comms/dht/src/inbound/dht_handler/message.rs @@ -0,0 +1,42 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use serde::{Deserialize, Serialize}; +use tari_comms::{connection::NetAddress, peer_manager::NodeId}; + +/// The JoinMessage stores the information required for a network join request. It has all the information required to +/// locate and contact the source node, but network behaviour is different compared to DiscoverMessage. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct JoinMessage { + pub node_id: NodeId, + // TODO: node_type + pub net_addresses: Vec, +} + +/// The DiscoverMessage stores the information required for a network discover request. It has all the information +/// required to locate and contact the source node, but network behaviour is different compared to JoinMessage. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct DiscoverMessage { + pub node_id: NodeId, + // TODO: node_type + pub net_addresses: Vec, +} diff --git a/comms/dht/src/inbound/dht_handler/middleware.rs b/comms/dht/src/inbound/dht_handler/middleware.rs new file mode 100644 index 0000000000..e4e769988d --- /dev/null +++ b/comms/dht/src/inbound/dht_handler/middleware.rs @@ -0,0 +1,84 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use super::task::ProcessDhtMessages; +use crate::{config::DhtConfig, inbound::DecryptedDhtMessage, outbound::OutboundMessageRequester}; +use futures::{task::Context, Future, Poll}; +use std::sync::Arc; +use tari_comms::peer_manager::{NodeIdentity, PeerManager}; +use tari_comms_middleware::MiddlewareError; +use tower::Service; + +#[derive(Clone)] +pub struct DhtHandlerMiddleware { + config: DhtConfig, + next_service: S, + peer_manager: Arc, + node_identity: Arc, + outbound_service: OutboundMessageRequester, +} + +impl DhtHandlerMiddleware { + pub fn new( + config: DhtConfig, + next_service: S, + node_identity: Arc, + peer_manager: Arc, + outbound_service: OutboundMessageRequester, + ) -> Self + { + Self { + config, + next_service, + node_identity, + peer_manager, + outbound_service, + } + } +} + +impl Service for DhtHandlerMiddleware +where + S: Service + Clone, + S::Error: Into, +{ + type Error = MiddlewareError; + type Response = (); + + type Future = impl Future>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.next_service.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, message: DecryptedDhtMessage) -> Self::Future { + ProcessDhtMessages::new( + self.config.clone(), + self.next_service.clone(), + Arc::clone(&self.peer_manager), + self.outbound_service.clone(), + Arc::clone(&self.node_identity), + message, + ) + .run() + } +} diff --git a/comms/dht/src/inbound/dht_handler/mod.rs b/comms/dht/src/inbound/dht_handler/mod.rs new file mode 100644 index 0000000000..2640bc73bf --- /dev/null +++ b/comms/dht/src/inbound/dht_handler/mod.rs @@ -0,0 +1,26 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +pub mod layer; +pub mod message; +pub mod middleware; +pub mod task; diff --git a/comms/dht/src/inbound/dht_handler/task.rs b/comms/dht/src/inbound/dht_handler/task.rs new file mode 100644 index 0000000000..7f14881863 --- /dev/null +++ b/comms/dht/src/inbound/dht_handler/task.rs @@ -0,0 +1,187 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use super::message::{DiscoverMessage, JoinMessage}; +use crate::{ + config::DhtConfig, + inbound::{error::DhtInboundError, message::DecryptedDhtMessage}, + message::{DhtMessageFlags, DhtMessageType}, + outbound::{BroadcastStrategy, OutboundMessageRequester}, +}; +use log::*; +use std::sync::Arc; +use tari_comms::{ + connection::NetAddress, + message::NodeDestination, + peer_manager::{NodeId, NodeIdentity, Peer, PeerFlags, PeerManager}, + types::CommsPublicKey, +}; +use tari_comms_middleware::MiddlewareError; +use tari_utilities::message_format::MessageFormat; +use tower::{Service, ServiceExt}; + +pub struct ProcessDhtMessages { + config: DhtConfig, + next_service: S, + peer_manager: Arc, + outbound_service: OutboundMessageRequester, + node_identity: Arc, + message: Option, +} + +impl ProcessDhtMessages +where + S: Service, + S::Error: Into, +{ + pub fn new( + config: DhtConfig, + next_service: S, + peer_manager: Arc, + outbound_service: OutboundMessageRequester, + node_identity: Arc, + message: DecryptedDhtMessage, + ) -> Self + { + Self { + config, + next_service, + peer_manager, + outbound_service, + node_identity, + message: Some(message), + } + } + + pub async fn run(mut self) -> Result<(), MiddlewareError> { + let message = self + .message + .take() + .expect("DhtInboundMessageTask initialized without message"); + match message.dht_header.message_type { + DhtMessageType::Join => self.handle_join(message).await.map_err(Into::into), + DhtMessageType::Discover => self.handle_discover(message).await.map_err(Into::into), + // Not a DHT message, call downstream middleware + DhtMessageType::None => { + self.next_service.ready().await.map_err(Into::into)?; + self.next_service.call(message).await.map_err(Into::into) + }, + } + } + + fn add_or_update_peer( + &self, + pubkey: &CommsPublicKey, + node_id: NodeId, + net_addresses: Vec, + ) -> Result<(), DhtInboundError> + { + let peer_manager = &self.peer_manager; + // Add peer or modify existing peer using received join request + if peer_manager.exists(pubkey)? { + peer_manager.update_peer(pubkey, Some(node_id), Some(net_addresses), None)?; + } else { + peer_manager.add_peer(Peer::new( + pubkey.clone(), + node_id, + net_addresses.into(), + PeerFlags::default(), + ))?; + } + + Ok(()) + } + + async fn handle_join(&mut self, message: DecryptedDhtMessage) -> Result<(), DhtInboundError> { + let join_msg = JoinMessage::from_binary(&message.inner_success().body)?; + + // TODO: Check/Verify the received peers information + self.add_or_update_peer( + &message.dht_header.origin_public_key, + join_msg.node_id.clone(), + join_msg.net_addresses, + )?; + + // Send a join request back to the source peer of the join request if that peer is from the same region + // of network. Also, only Send a join request back if this copy of the received join + // request was not sent directly from the original source peer but was forwarded. If it + // was not forwarded then that source peer already has the current peers info in its + // PeerManager. + if message.dht_header.origin_public_key != message.source_peer.public_key && + self.peer_manager.in_network_region( + &join_msg.node_id, + &self.node_identity.identity.node_id, + self.config.max_nodes_join_request, + )? + { + self.send_join_direct(message.dht_header.origin_public_key.clone()) + .await?; + } + + // Propagate message to closer peers + // oms.forward_message( + // BroadcastStrategy::Closest(ClosestRequest { + // n: DHT_BROADCAST_NODE_COUNT, + // node_id: join_msg.node_id.clone(), + // excluded_peers: vec![info.origin_source, info.peer_source.public_key], + // }), + // info.message_envelope, + // )?; + Ok(()) + } + + async fn handle_discover(&mut self, message: DecryptedDhtMessage) -> Result<(), DhtInboundError> { + let discover_msg = DiscoverMessage::from_binary(&message.inner_success().body)?; + // TODO: Check/Verify the received peers information + self.add_or_update_peer( + &message.dht_header.origin_public_key, + discover_msg.node_id, + discover_msg.net_addresses, + )?; + + // Send the origin the current nodes latest contact info + self.send_join_direct(message.dht_header.origin_public_key).await?; + + Ok(()) + } + + /// Send a network join update request directly to a specific known peer + async fn send_join_direct(&mut self, dest_public_key: CommsPublicKey) -> Result<(), DhtInboundError> { + let join_msg = JoinMessage { + node_id: self.node_identity.identity.node_id.clone(), + net_addresses: vec![self.node_identity.control_service_address()], + }; + + trace!("Sending direct join request to {}", dest_public_key); + self.outbound_service + .send_message( + BroadcastStrategy::DirectPublicKey(dest_public_key.clone()), + NodeDestination::PublicKey(dest_public_key), + DhtMessageFlags::ENCRYPTED, + DhtMessageType::Join, + join_msg, + ) + .await?; + + Ok(()) + } +} diff --git a/comms/dht/src/inbound/error.rs b/comms/dht/src/inbound/error.rs new file mode 100644 index 0000000000..d9c3682d0e --- /dev/null +++ b/comms/dht/src/inbound/error.rs @@ -0,0 +1,38 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::outbound::{BroadcastStrategyError, DhtOutboundError}; +use derive_error::Error; +use tari_comms::{message::MessageError, peer_manager::PeerManagerError}; +use tari_comms_middleware::impl_into_middleware_error; +use tari_utilities::message_format::MessageFormatError; + +#[derive(Debug, Error)] +pub enum DhtInboundError { + MessageError(MessageError), + MessageFormatError(MessageFormatError), + PeerManagerError(PeerManagerError), + DhtOutboundError(DhtOutboundError), + BroadcastStrategyError(BroadcastStrategyError), +} + +impl_into_middleware_error!(DhtInboundError); diff --git a/comms/dht/src/inbound/forward.rs b/comms/dht/src/inbound/forward.rs new file mode 100644 index 0000000000..9b5c68c2aa --- /dev/null +++ b/comms/dht/src/inbound/forward.rs @@ -0,0 +1,237 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::{ + inbound::{error::DhtInboundError, message::DecryptedDhtMessage}, + outbound::{BroadcastStrategy, OutboundMessageRequester}, +}; +use futures::{task::Context, Future, Poll}; +use log::*; +use std::sync::Arc; +use tari_comms::peer_manager::{NodeIdentity, PeerManager}; +use tari_comms_middleware::MiddlewareError; +use tower::{layer::Layer, Service, ServiceExt}; + +const LOG_TARGET: &'static str = "comms::middleware::forward"; + +/// This layer is responsible for forwarding messages which have failed to decrypt +pub struct ForwardLayer { + peer_manager: Arc, + node_identity: Arc, + outbound_service: OutboundMessageRequester, +} + +impl ForwardLayer { + pub fn new( + peer_manager: Arc, + node_identity: Arc, + outbound_service: OutboundMessageRequester, + ) -> Self + { + Self { + peer_manager, + node_identity, + outbound_service, + } + } +} + +impl Layer for ForwardLayer { + type Service = ForwardMiddleware; + + fn layer(&self, service: S) -> Self::Service { + ForwardMiddleware::new( + service, + Arc::clone(&self.peer_manager), + Arc::clone(&self.node_identity), + self.outbound_service.clone(), + ) + } +} + +/// # Forward middleware +/// +/// Responsible for forwarding messages which fail to decrypt. +#[derive(Clone)] +pub struct ForwardMiddleware { + next_service: S, + peer_manager: Arc, + node_identity: Arc, + outbound_service: OutboundMessageRequester, +} + +impl ForwardMiddleware { + pub fn new( + service: S, + peer_manager: Arc, + node_identity: Arc, + outbound_service: OutboundMessageRequester, + ) -> Self + { + Self { + next_service: service, + peer_manager, + node_identity, + outbound_service, + } + } +} + +impl Service for ForwardMiddleware +where + S: Service + Clone + 'static, + S::Error: Into, +{ + type Error = MiddlewareError; + type Response = (); + + type Future = impl Future>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, msg: DecryptedDhtMessage) -> Self::Future { + Forwarder::new( + self.next_service.clone(), + Arc::clone(&self.peer_manager), + Arc::clone(&self.node_identity), + self.outbound_service.clone(), + ) + .handle(msg) + } +} + +/// Responsible for processing a single DecryptedDhtMessage, forwarding if necessary or passing the message +/// to the next service. +struct Forwarder { + peer_manager: Arc, + node_identity: Arc, + next_service: S, + outbound_service: OutboundMessageRequester, +} + +impl Forwarder { + pub fn new( + service: S, + peer_manager: Arc, + node_identity: Arc, + outbound_service: OutboundMessageRequester, + ) -> Self + { + Self { + peer_manager, + node_identity, + next_service: service, + outbound_service, + } + } +} + +impl Forwarder +where + S: Service, + S::Error: Into, +{ + async fn handle(mut self, message: DecryptedDhtMessage) -> Result<(), MiddlewareError> { + if message.decryption_succeeded() { + self.next_service.ready().await.map_err(Into::into)?; + self.next_service.call(message).await.map_err(Into::into) + } else { + self.forward(message).await.map_err(Into::into) + } + } + + async fn forward(&mut self, message: DecryptedDhtMessage) -> Result<(), DhtInboundError> { + let DecryptedDhtMessage { + comms_header, + decryption_result, + dht_header, + .. + } = message; + + let body = decryption_result.err().expect("previous check that decryption failed"); + + let broadcast_strategy = BroadcastStrategy::forward( + self.node_identity.identity.node_id.clone(), + &self.peer_manager, + dht_header.destination.clone(), + vec![comms_header.message_public_key.clone()], + )?; + + debug!(target: LOG_TARGET, "Forwarding message"); + self.outbound_service + .forward_message(broadcast_strategy, dht_header, body) + .await?; + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{ + message::DhtMessageFlags, + outbound::DhtOutboundRequest, + test_utils::{make_dht_inbound_message, make_node_identity, make_peer_manager, service_spy}, + }; + use futures::{channel::mpsc, executor::block_on, StreamExt}; + use tari_comms::message::Message; + + #[test] + fn decryption_succeeded() { + let spy = service_spy(); + let peer_manager = make_peer_manager(); + let node_identity = make_node_identity(); + let (oms_tx, mut oms_rx) = mpsc::channel(1); + let oms = OutboundMessageRequester::new(oms_tx); + let mut service = ForwardLayer::new(peer_manager, node_identity, oms).layer(spy.service::()); + + let inbound_msg = make_dht_inbound_message(&make_node_identity(), b"".to_vec(), DhtMessageFlags::empty()); + let msg = DecryptedDhtMessage::succeed(Message::from_message_format((), ()).unwrap(), inbound_msg); + block_on(service.call(msg)).unwrap(); + assert!(spy.is_called()); + assert!(oms_rx.try_next().is_err()); + } + + #[test] + fn decryption_failed() { + let spy = service_spy(); + let peer_manager = make_peer_manager(); + let node_identity = make_node_identity(); + let (oms_tx, mut oms_rx) = mpsc::channel(1); + let oms = OutboundMessageRequester::new(oms_tx); + let mut service = ForwardLayer::new(peer_manager, node_identity, oms).layer(spy.service::()); + + let inbound_msg = make_dht_inbound_message(&make_node_identity(), b"".to_vec(), DhtMessageFlags::empty()); + let msg = DecryptedDhtMessage::fail(inbound_msg); + block_on(service.call(msg)).unwrap(); + assert_eq!(spy.is_called(), false); + let oms_req = block_on(oms_rx.next()).unwrap(); + + match oms_req { + DhtOutboundRequest::Forward(_) => {}, + _ => panic!("Unexpected OMS request"), + } + } +} diff --git a/comms/dht/src/inbound/message.rs b/comms/dht/src/inbound/message.rs new file mode 100644 index 0000000000..6bfad8cfca --- /dev/null +++ b/comms/dht/src/inbound/message.rs @@ -0,0 +1,108 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::{consts::DHT_ENVELOPE_HEADER_VERSION, message::DhtHeader}; +use tari_comms::{ + message::{Message, MessageEnvelopeHeader}, + peer_manager::Peer, +}; + +pub struct DhtInboundMessage { + pub version: u8, + pub source_peer: Peer, + pub comms_header: MessageEnvelopeHeader, + pub dht_header: DhtHeader, + pub body: Vec, +} +impl DhtInboundMessage { + pub fn new(dht_header: DhtHeader, source_peer: Peer, comms_header: MessageEnvelopeHeader, body: Vec) -> Self { + Self { + version: DHT_ENVELOPE_HEADER_VERSION, + dht_header, + source_peer, + comms_header, + body, + } + } +} + +/// Represents a decrypted InboundMessage. +pub struct DecryptedDhtMessage { + pub version: u8, + pub source_peer: Peer, + pub comms_header: MessageEnvelopeHeader, + pub dht_header: DhtHeader, + pub decryption_result: Result>, +} + +impl DecryptedDhtMessage { + pub fn succeed(decrypted_message: Message, message: DhtInboundMessage) -> Self { + Self { + version: message.version, + source_peer: message.source_peer, + comms_header: message.comms_header, + dht_header: message.dht_header, + decryption_result: Ok(decrypted_message), + } + } + + pub fn fail(message: DhtInboundMessage) -> Self { + Self { + version: message.version, + source_peer: message.source_peer, + comms_header: message.comms_header, + dht_header: message.dht_header, + decryption_result: Err(message.body), + } + } + + pub fn inner_success(&self) -> &Message { + // Expect the caller to know that the decryption has succeeded + self.decryption_result + .as_ref() + .expect("called inner_success on failed decryption message") + } + + pub fn inner_fail(&self) -> &Vec { + // Expect the caller to know that the decryption has succeeded + self.decryption_result + .as_ref() + .err() + .expect("called inner_fail on succesfully decrypted message") + } + + pub fn failed(&self) -> Option<&Vec> { + self.decryption_result.as_ref().err() + } + + pub fn succeeded(&self) -> Option<&Message> { + self.decryption_result.as_ref().ok() + } + + pub fn decryption_succeeded(&self) -> bool { + self.decryption_result.is_ok() + } + + pub fn decryption_failed(&self) -> bool { + self.decryption_result.is_err() + } +} diff --git a/comms/dht/src/inbound/mod.rs b/comms/dht/src/inbound/mod.rs new file mode 100644 index 0000000000..074a7ac113 --- /dev/null +++ b/comms/dht/src/inbound/mod.rs @@ -0,0 +1,36 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +mod decryption; +mod deserialize; +mod dht_handler; +mod error; +mod forward; +mod message; + +pub use self::{ + decryption::DecryptionLayer, + deserialize::DeserializeLayer, + dht_handler::layer::DhtHandlerLayer, + forward::ForwardLayer, + message::{DecryptedDhtMessage, DhtInboundMessage}, +};