Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Event connected implementation #620

Merged
9 commits merged into from
Aug 19, 2015
Merged
6 changes: 5 additions & 1 deletion examples/simple_key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ use routing::data::{Data, DataRequest};
use routing::plain_data::PlainData;
use routing::utils::{encode};
use routing::{ExternalRequest, ExternalResponse, SignedToken};
use routing::id::Id;
use routing::public_id::PublicId;

// ========================== Program Options =================================
static USAGE: &'static str = "
Expand Down Expand Up @@ -279,7 +281,9 @@ impl Client {
fn new(_bootstrap_peers: Vec<Endpoint>) -> Result<Client, RoutingError> {
let (event_sender, event_receiver) = mpsc::channel::<Event>();

let routing = Routing::new_client(event_sender, None);
let id = Id::new();
println!("Client has set name {:?}", PublicId::new(&id));
let routing = Routing::new_client(event_sender, Some(id));

let (command_sender, command_receiver) = mpsc::channel::<UserCommand>();

Expand Down
1 change: 1 addition & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub enum Event {
// ~~|~~~~~~~~~~
// | our close group sorted from our name; always including our name
// | if size > 1, we are connected to the network
Bootstrapped,
Connected,
Disconnected,
FailedRequest(Authority, ExternalRequest, InterfaceError),
Expand Down
6 changes: 0 additions & 6 deletions src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ impl Id {
self.name
}

pub fn set_name(&mut self, name: NameType) {
// This function should not exist, it is here only temporarily
// to fix compilation.
self.name = name;
}

// name field is initially same as original_name, this should be later overwritten by
// relocated name provided by the network using this method
pub fn assign_relocated_name(&mut self, relocated_name: NameType) -> bool {
Expand Down
28 changes: 15 additions & 13 deletions src/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Routing {

// start the handler for routing without a restriction to become a full node
let mut routing_node = RoutingNode::new(action_sender.clone(), action_receiver,
event_sender, false);
event_sender, false, None);

spawn(move || {
debug!("started routing run()");
Expand All @@ -71,24 +71,26 @@ impl Routing {
/// Starts a new RoutingIdentity, which will also start a new RoutingNode.
/// The RoutingNode will only bootstrap to the network and not attempt to
/// achieve full routing node status.
/// If the client is started with a relocated id (ie the name has been reassigned),
/// the core will instantely instantiate termination of the client.
pub fn new_client(event_sender : mpsc::Sender<Event>, keys : Option<Id>)
-> Routing {
sodiumoxide::init(); // enable shared global (i.e. safe to multithread now)
sodiumoxide::init(); // enable shared global (i.e. safe to multithread now)

let (action_sender, action_receiver) = mpsc::channel::<Action>();
let (action_sender, action_receiver) = mpsc::channel::<Action>();

// start the handler for routing with a restriction to become a full node
let mut routing_node = RoutingNode::new(action_sender.clone(), action_receiver,
event_sender, true);
// start the handler for routing with a restriction to become a full node
let mut routing_node = RoutingNode::new(action_sender.clone(), action_receiver,
event_sender, true, keys);

spawn(move || {
routing_node.run();
debug!("Routing node terminated running.");
});
spawn(move || {
routing_node.run();
debug!("Routing node terminated running.");
});

Routing {
action_sender : action_sender,
}
Routing {
action_sender : action_sender,
}
}

/// Send a Get message with a DataRequest to an Authority, signed with given keys.
Expand Down
63 changes: 53 additions & 10 deletions src/routing_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use id::Id;
use public_id::PublicId;
use NameType;
use peer::Peer;
use action::Action;
use event::Event;
use messages::RoutingMessage;

Expand All @@ -50,20 +51,32 @@ pub struct RoutingCore {
network_name : Option<NameType>,
routing_table : Option<RoutingTable>,
relay_map : RelayMap,
// sender for signaling churn events
// sender for signaling events and action
event_sender : Sender<Event>,
action_sender : Sender<Action>,
}

impl RoutingCore {
/// Start a RoutingCore with a new Id and the disabled RoutingTable
pub fn new(event_sender : Sender<Event>) -> RoutingCore {
let id = Id::new();
pub fn new(event_sender : Sender<Event>, action_sender : Sender<Action>,
keys : Option<Id>) -> RoutingCore {
let id = match keys {
Some(id) => id,
None => Id::new(),
};
// nodes are not persistant, and a client has no network allocated name
if id.is_relocated() {
error!("Core terminates routing as initialised with relocated id {:?}",
PublicId::new(&id));
let _ = action_sender.send(Action::Terminate); };

RoutingCore {
id : id,
network_name : None,
routing_table : None,
relay_map : RelayMap::new(),
event_sender : event_sender,
action_sender : action_sender,
}
}

Expand Down Expand Up @@ -183,7 +196,12 @@ impl RoutingCore {
Some(ref mut routing_table) => {
let trigger_churn = routing_table
.address_in_our_close_group_range(&name);
let routing_table_count_prior = routing_table.size();
routing_table.drop_node(&name);
if routing_table_count_prior == 1usize {
error!("Routing Node has disconnected.");
let _ = self.event_sender.send(Event::Disconnected);
let _ = self.action_sender.send(Action::Terminate); };
info!("RT({:?}) dropped node {:?}", routing_table.size(), name);
if trigger_churn {
let mut close_group : Vec<NameType> = routing_table
Expand All @@ -198,7 +216,15 @@ impl RoutingCore {
None => None,
}
},
_ => self.relay_map.drop_connection_name(connection_name)
_ => {
let bootstrapped_prior = self.relay_map.has_bootstrap_connections();
let dropped_peer = self.relay_map.drop_connection_name(connection_name);
let bootstrapped_posterior = self.relay_map.has_bootstrap_connections();
if !bootstrapped_posterior && bootstrapped_prior && !self.is_node() {
error!("Routing Client has disconnected.");
let _ = self.event_sender.send(Event::Disconnected);
let _ = self.action_sender.send(Action::Terminate); };
dropped_peer },
}
}

Expand All @@ -217,15 +243,21 @@ impl RoutingCore {
.address_in_our_close_group_range(&routing_name);
let node_info = NodeInfo::new(given_public_id,
vec![endpoint.clone()], Some(endpoint));
let routing_table_count_prior = routing_table.size();
// TODO (ben 10/08/2015) drop connection of dropped node
let (added, _) = routing_table.add_node(node_info);
if added { info!("RT({:?}) added {:?}", routing_table.size(),
routing_name); };
if added {
// if we transition from zero to one routing connection
if routing_table_count_prior == 0usize {
info!("Routing Node has connected.");
let _ = self.event_sender.send(Event::Connected); };
info!("RT({:?}) added {:?}", routing_table.size(),
routing_name); };
if added && trigger_churn {
let mut close_group : Vec<NameType> = routing_table
.our_close_group().iter()
.map(|node_info| node_info.fob.name())
.collect::<Vec<NameType>>();
.our_close_group().iter()
.map(|node_info| node_info.fob.name())
.collect::<Vec<NameType>>();
close_group.insert(0, self.id.name());
let _ = self.event_sender.send(Event::Churn(close_group));
};
Expand All @@ -237,7 +269,18 @@ impl RoutingCore {
}
},
_ => {
self.relay_map.add_peer(identity, endpoint, public_id)
let bootstrapped_prior = self.relay_map.has_bootstrap_connections();
let is_bootstrap_connection = match identity {
ConnectionName::Bootstrap(_) => true,
_ => false,
};
let added = self.relay_map.add_peer(identity, endpoint, public_id);
if !bootstrapped_prior && added && is_bootstrap_connection
&& self.routing_table.is_none() {
info!("Routing Client bootstrapped.");
let _ = self.event_sender.send(Event::Bootstrapped);
};
added
},
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/routing_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ impl RoutingNode {
pub fn new(action_sender : mpsc::Sender<Action>,
action_receiver : mpsc::Receiver<Action>,
event_sender : mpsc::Sender<Event>,
client_restriction : bool) -> RoutingNode {
client_restriction : bool,
keys : Option<Id>) -> RoutingNode {

let (crust_sender, crust_receiver) = mpsc::channel::<crust::Event>();
let mut cm = crust::ConnectionManager::new(crust_sender);
let _ = cm.start_accepting(vec![Port::Tcp(5483u16)]);
let accepting_on = cm.get_own_endpoints();

let core = RoutingCore::new(event_sender.clone());
let core = RoutingCore::new(event_sender.clone(), action_sender.clone(), keys);
info!("RoutingNode {:?} listens on {:?}", core.our_address(), accepting_on);

RoutingNode {
Expand Down Expand Up @@ -151,6 +152,7 @@ impl RoutingNode {
},
Ok(Action::Terminate) => {
debug!("routing node terminated");
let _ = self.event_sender.send(Event::Terminated);
self.connection_manager.stop();
break;
},
Expand Down