Skip to content

Commit

Permalink
Merge pull request #752 from tari-project/sb-comms-oms-futures-03
Browse files Browse the repository at this point in the history
Created async outbound messsage service

Description
Created an async outbound message service which is responsible for
constructing messages from a broadcast strategy, requesting connections
from the ConnectionManager and sending batches of messages. As before, failed
connections to a peer are retried using a exponential backoff strategy.

Functionally similar to the current OMS, except that it is asynchronous and lock free (it
connects and sends messages on two tasks).

Currently this is not included in the module tree to keep the PR
smaller. A subsequent PR will make use of the async OMS.

Motivation and Context
Ref #655
Ref #708

How Has This Been Tested?
Unit test in the module - subsequent PR will contain integration tests using the new OMS

Types of changes
  • Loading branch information
CjS77 authored Sep 12, 2019
2 parents ba572ec + e47616b commit 5a88923
Show file tree
Hide file tree
Showing 6 changed files with 1,099 additions and 0 deletions.
130 changes: 130 additions & 0 deletions comms/src/outbound_service/broadcast_strategy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2019 The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{
consts::DHT_FORWARD_NODE_COUNT,
message::NodeDestination,
peer_manager::{node_id::NodeId, peer_manager::PeerManager, PeerManagerError},
types::CommsPublicKey,
};
use derive_error::Error;
use std::sync::Arc;

#[derive(Debug, Error)]
pub enum BroadcastStrategyError {
PeerManagerError(PeerManagerError),
}

#[derive(Debug)]
pub struct ClosestRequest {
pub n: usize,
pub node_id: NodeId,
pub excluded_peers: Vec<CommsPublicKey>,
}

#[derive(Debug)]
pub enum BroadcastStrategy {
/// Send to a particular peer matching the given node ID
DirectNodeId(NodeId),
/// Send to a particular peer matching the given Public Key
DirectPublicKey(CommsPublicKey),
/// Send to all known Communication Node peers
Flood,
/// Send to all n nearest neighbour Communication Nodes
Closest(ClosestRequest),
/// Send to a random set of peers of size n that are Communication Nodes
Random(usize),
}

impl BroadcastStrategy {
/// The forward function selects the most appropriate broadcast strategy based on the received messages destination
pub fn forward(
source_node_id: NodeId,
peer_manager: &Arc<PeerManager>,
header_dest: NodeDestination<CommsPublicKey>,
excluded_peers: Vec<CommsPublicKey>,
) -> Result<Self, BroadcastStrategyError>
{
Ok(match header_dest {
NodeDestination::Unknown => {
// Send to the current nodes nearest neighbours
BroadcastStrategy::Closest(ClosestRequest {
n: DHT_FORWARD_NODE_COUNT,
node_id: source_node_id,
excluded_peers,
})
},
NodeDestination::PublicKey(dest_public_key) => {
if peer_manager.exists(&dest_public_key)? {
// Send to destination peer directly if the current node knows that peer
BroadcastStrategy::DirectPublicKey(dest_public_key)
} else {
// Send to the current nodes nearest neighbours
BroadcastStrategy::Closest(ClosestRequest {
n: DHT_FORWARD_NODE_COUNT,
node_id: source_node_id,
excluded_peers,
})
}
},
NodeDestination::NodeId(dest_node_id) => {
match peer_manager.find_with_node_id(&dest_node_id) {
Ok(dest_peer) => {
// Send to destination peer directly if the current node knows that peer
BroadcastStrategy::DirectPublicKey(dest_peer.public_key)
},
Err(_) => {
// Send to peers that are closest to the destination network region
BroadcastStrategy::Closest(ClosestRequest {
n: DHT_FORWARD_NODE_COUNT,
node_id: dest_node_id,
excluded_peers,
})
},
}
},
})
}

/// The discover function selects an appropriate broadcast strategy for the discovery of a specific node
pub fn discover(
source_node_id: NodeId,
dest_node_id: Option<NodeId>,
header_dest: NodeDestination<CommsPublicKey>,
excluded_peers: Vec<CommsPublicKey>,
) -> Self
{
let network_location_node_id = match dest_node_id {
Some(node_id) => node_id,
None => match header_dest.clone() {
NodeDestination::Unknown => source_node_id,
NodeDestination::PublicKey(_) => source_node_id,
NodeDestination::NodeId(node_id) => node_id,
},
};
BroadcastStrategy::Closest(ClosestRequest {
n: DHT_FORWARD_NODE_COUNT,
node_id: network_location_node_id,
excluded_peers,
})
}
}
41 changes: 41 additions & 0 deletions comms/src/outbound_service/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2019 The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{
connection::ConnectionError,
connection_manager::ConnectionManagerError,
message::MessageError,
peer_manager::PeerManagerError,
};
use derive_error::Error;
use futures::channel::mpsc::SendError;
use tari_utilities::message_format::MessageFormatError;

#[derive(Debug, Error)]
pub enum OutboundServiceError {
SendError(SendError),
MessageSerializationError(MessageError),
MessageFormatError(MessageFormatError),
PeerManagerError(PeerManagerError),
ConnectionManagerError(ConnectionManagerError),
ConnectionError(ConnectionError),
}
101 changes: 101 additions & 0 deletions comms/src/outbound_service/messages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2019 The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{
message::{Frame, FrameSet, MessageEnvelope, MessageFlags},
outbound_message_service::broadcast_strategy::BroadcastStrategy,
peer_manager::node_id::NodeId,
};
use serde::{Deserialize, Serialize};

/// Represents requests to the CommsOutboundService
#[derive(Debug)]
pub enum OutboundRequest {
/// Send a message using the given broadcast strategy
SendMsg {
broadcast_strategy: BroadcastStrategy,
flags: MessageFlags,
body: Box<Frame>,
},
/// Forward a message envelope
Forward {
broadcast_strategy: BroadcastStrategy,
message_envelope: Box<MessageEnvelope>,
},
}

/// The OutboundMessage has a copy of the MessageEnvelope. OutboundMessageService will create the
/// OutboundMessage and forward it to the OutboundMessagePool.
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
pub struct OutboundMessage {
destination_node_id: NodeId,
message_frames: FrameSet,
}

impl OutboundMessage {
/// Create a new OutboundMessage from the destination_node_id and message_frames
pub fn new(destination: NodeId, message_frames: FrameSet) -> OutboundMessage {
OutboundMessage {
destination_node_id: destination,
message_frames,
}
}

/// Get a reference to the destination NodeID
pub fn destination_node_id(&self) -> &NodeId {
&self.destination_node_id
}

/// Get a reference to the message frames
pub fn message_frames(&self) -> &FrameSet {
&self.message_frames
}

/// Consume this wrapper and return ownership of the frames
pub fn into_frames(self) -> FrameSet {
self.message_frames
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn new() {
let node_id = NodeId::new();
let subject = OutboundMessage::new(node_id.clone(), vec![vec![1]]);
assert_eq!(subject.message_frames[0].len(), 1);
assert_eq!(subject.destination_node_id, node_id);
}

#[test]
fn getters() {
let node_id = NodeId::new();
let frames = vec![vec![1]];
let subject = OutboundMessage::new(node_id.clone(), frames.clone());

assert_eq!(subject.destination_node_id(), &node_id);
assert_eq!(subject.message_frames(), &frames);
assert_eq!(subject.into_frames(), frames);
}
}
34 changes: 34 additions & 0 deletions comms/src/outbound_service/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2019 The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

pub mod broadcast_strategy;
mod error;
mod messages;
mod service;
mod worker;

pub use self::{
broadcast_strategy::BroadcastStrategy,
error::OutboundServiceError,
messages::OutboundRequest,
service::{OutboundMessageService, OutboundServiceConfig, OutboundServiceRequester},
};
Loading

0 comments on commit 5a88923

Please sign in to comment.