Skip to content

Commit

Permalink
Merge pull request maidsafe#314 from inetic/channels
Browse files Browse the repository at this point in the history
Channels
  • Loading branch information
vinipsmaker committed Sep 10, 2015
2 parents 581a762 + 8c34c73 commit 706a6f8
Show file tree
Hide file tree
Showing 16 changed files with 1,397 additions and 1,422 deletions.
28 changes: 13 additions & 15 deletions benches/send_random_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,36 +52,34 @@ fn wait_for_connection(receiver: &Receiver<Event>) -> Endpoint{

#[bench]
fn send_random_data(b: &mut Bencher) {
let (cm1_tx, cm1_rx) = channel();
let mut cm1 = ConnectionManager::new_inactive(cm1_tx).unwrap();
let (s1_tx, s1_rx) = channel();
let mut s1 = Service::new_inactive(s1_tx).unwrap();

let cm1_port = match cm1.start_accepting(Port::Tcp(0)) {
let s1_port = match s1.start_accepting(Port::Tcp(0)) {
Ok(port) => port,
Err(_) => panic!("Failed to start ConnectionManager #1"),
Err(_) => panic!("Failed to start Service #1"),
};

let cm1_endpoint = Endpoint::new(IpAddr::V4(Ipv4Addr::new(127,0,0,1)), cm1_port);
let s1_endpoint = Endpoint::new(IpAddr::V4(Ipv4Addr::new(127,0,0,1)), s1_port);

let (cm2_tx, cm2_rx) = channel();
let cm2 = ConnectionManager::new_inactive(cm2_tx).unwrap();
let (s2_tx, s2_rx) = channel();
let s2 = Service::new_inactive(s2_tx).unwrap();

cm2.connect(vec![cm1_endpoint]);
s2.connect(vec![s1_endpoint]);

let _cm2_ep = wait_for_connection(&cm1_rx);
let cm1_ep = wait_for_connection(&cm2_rx);
let _s2_ep = wait_for_connection(&s1_rx);
let s1_ep = wait_for_connection(&s2_rx);

let data = generate_random_vec_u8(1024 * 1024);
let data_len = data.len();

b.iter(move || {
if let Err(what) = cm2.send(cm1_ep.clone(), data.clone()) {
panic!(format!("ConnectionManager #2 failed to send data: {:?}", what));
}
s2.send(s1_ep.clone(), data.clone());

loop {
let event = match cm1_rx.recv() {
let event = match s1_rx.recv() {
Ok(event) => event,
Err(_) => panic!("ConnectionManager #1 closed connection"),
Err(_) => panic!("Service #1 closed connection"),
};

match event {
Expand Down
56 changes: 11 additions & 45 deletions examples/crust_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use std::net::SocketAddr;
use std::str::FromStr;
use std::thread;

use crust::{ConnectionManager, Endpoint};
use crust::{Service, Endpoint};

static USAGE: &'static str = "
Usage:
Expand Down Expand Up @@ -359,20 +359,20 @@ fn main() {
let mut stdout = term::stdout();
let mut stdout_copy = term::stdout();

// Construct ConnectionManager and start listening
// Construct Service and start listening
let (channel_sender, channel_receiver) = channel();
let (bs_sender, bs_receiver) = channel();
let mut connection_manager = ConnectionManager::new(channel_sender).unwrap();
let mut service = Service::new(channel_sender).unwrap();
stdout = green_foreground(stdout);
let listening_endpoints = connection_manager.get_own_endpoints();
let listening_endpoints = service.get_own_endpoints();
print!("Listening for new connections on");
for endpoint in &listening_endpoints {
print!(" {:?}", *endpoint);
};
println!("");

stdout = reset_foreground(stdout);
connection_manager.bootstrap(15);
service.bootstrap(15);

// Start event-handling thread
let running_speed_test = args.flag_speed.is_some();
Expand Down Expand Up @@ -456,17 +456,8 @@ fn main() {
let times = cmp::max(1, speed / length);
let sleep_time = cmp::max(1, 1000 / times);
for _ in 0..times {
match connection_manager.send(peer.clone(),
generate_random_vec_u8(length as usize)) {
Ok(()) => debug!("Sent a message with length of {} bytes to {:?}", length,
peer),
Err(_) => {
stdout = red_foreground(stdout);
debug!("Lost connection to peer. Exiting.");
let _ = reset_foreground(stdout);
return;
},
};
service.send(peer.clone(), generate_random_vec_u8(length as usize));
debug!("Sent a message with length of {} bytes to {:?}", length, peer);
std::thread::sleep_ms(sleep_time as u32);
}
}
Expand Down Expand Up @@ -499,7 +490,7 @@ fn main() {
let ep = args.arg_peer.unwrap().addr;
/* if utp_mode { Endpoint::Utp(ep) } else { */Endpoint::Tcp(ep) //}
}];
connection_manager.connect(peer);
service.connect(peer);
} else if args.cmd_send {
// docopt should ensure arg_peer and arg_message are valid
assert!(args.arg_peer.is_some());
Expand All @@ -513,32 +504,7 @@ fn main() {
message.push_str(" ");
message.push_str(&args.arg_message[i]);
};
match connection_manager.send(peer.clone(), message.clone().into_bytes()) {
Ok(()) => {
stdout = green_foreground(stdout);
println!("Successfully sent \"{}\" to {:?}", message, peer);
stdout = reset_foreground(stdout)
},
Err(error) => {
match error.kind() {
io::ErrorKind::NotConnected => {
stdout = yellow_foreground(stdout);
println!("Failed to send: we have no connection to {:?}", peer);
stdout = reset_foreground(stdout)
},
io::ErrorKind::BrokenPipe => {
stdout = yellow_foreground(stdout);
println!("Failed to send to {:?}: internal channel error.", peer);
stdout = reset_foreground(stdout)
},
_ => {
stdout = yellow_foreground(stdout);
println!("Failed to send to {:?}: unexpected error.", peer);
stdout = reset_foreground(stdout)
},
}
},
}
service.send(peer.clone(), message.clone().into_bytes());
} else if args.cmd_stop {
stdout = green_foreground(stdout);
println!("Stopped.");
Expand All @@ -547,7 +513,7 @@ fn main() {
}
}
}
connection_manager.stop();
drop(connection_manager);
service.stop();
drop(service);
let _ = handler.join();
}
10 changes: 4 additions & 6 deletions examples/simple_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ fn main() {
Err(e) => println!("Error initialising logger; continuing without: {:?}", e)
}

// The ConnectionManager will probably create a "user app directory" (see the docs for
// The Service will probably create a "user app directory" (see the docs for
// `FileHandler::write_file()`). This object will try to clean up this directory when it goes
// out of scope. Normally apps would not do this - this directory will hold the peristent cache
// files.
let _cleaner = ::crust::ScopedUserAppDirRemover;

// We receive events (e.g. new connection, message received) from the ConnectionManager via an
// We receive events (e.g. new connection, message received) from the Service via an
// asynchronous channel.
let (channel_sender, channel_receiver) = ::std::sync::mpsc::channel();
let connection_manager = ::crust::ConnectionManager::new(channel_sender)
let service = ::crust::Service::new(channel_sender)
.unwrap();

println!("Run the simple_sender example in another terminal to send messages to this node.");
Expand Down Expand Up @@ -88,9 +88,7 @@ fn main() {
endpoint, fibonacci_result);
let response =
format!("The Fibonacci number for {} is {}", requested_value, fibonacci_result);
if let Err(why) = connection_manager.send(endpoint.clone(), response.into_bytes()) {
println!("Failed to send reply to {:?}: {}", endpoint, why)
}
service.send(endpoint.clone(), response.into_bytes());
},
crust::Event::NewConnection(endpoint) => {
println!("New connection made to {:?}", endpoint);
Expand Down
14 changes: 6 additions & 8 deletions examples/simple_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ fn main() {
Err(e) => println!("Error initialising logger; continuing without: {:?}", e)
}

// The ConnectionManager will probably create a "user app directory" (see the docs for
// The Service will probably create a "user app directory" (see the docs for
// `FileHandler::write_file()`). This object will try to clean up this directory when it goes
// out of scope. Normally apps would not do this - this directory will hold the peristent cache
// files.
let _cleaner = ::crust::ScopedUserAppDirRemover;

// We receive events (e.g. new connection, message received) from the ConnectionManager via an
// We receive events (e.g. new connection, message received) from the Service via an
// asynchronous channel.
let (channel_sender, channel_receiver) = ::std::sync::mpsc::channel();
let mut connection_manager = ::crust::ConnectionManager::new(channel_sender).unwrap();
let mut service = ::crust::Service::new(channel_sender).unwrap();

let (bs_sender, bs_receiver) = ::std::sync::mpsc::channel();
// Start a thread running a loop which will receive and display responses from the peer.
Expand Down Expand Up @@ -71,9 +71,9 @@ fn main() {
println!("Stopped receiving.");
});

connection_manager.bootstrap(1);
service.bootstrap(1);

println!("ConnectionManager trying to bootstrap off node listening on TCP port 8888 \
println!("Service trying to bootstrap off node listening on TCP port 8888 \
and UDP broadcast port 5484");

// Block until bootstrapped
Expand All @@ -90,9 +90,7 @@ fn main() {
// Send all the numbers from 0 to 12 inclusive. Expect to receive replies containing the
// Fibonacci number for each value.
for value in (0u8..13u8) {
if let Err(why) = connection_manager.send(peer_endpoint.clone(), value.to_string().into_bytes()) {
println!("Failed to send {} to {:?}: {}", value, peer_endpoint, why)
}
service.send(peer_endpoint.clone(), value.to_string().into_bytes());
}

// Allow the peer time to process the requests and reply.
Expand Down
72 changes: 24 additions & 48 deletions installer/sample.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,76 +4,52 @@
"override_default_bootstrap": false,
"hard_coded_contacts": [
{
"endpoint": {
"protocol": "tcp",
"address": "45.55.207.180:5483"
}
"protocol": "tcp",
"address": "45.55.207.180:5483"
},
{
"endpoint": {
"protocol": "utp",
"address": "45.55.207.180:5483"
}
"protocol": "utp",
"address": "45.55.207.180:5483"
},
{
"endpoint": {
"protocol": "tcp",
"address": "178.62.7.96:5483"
}
"protocol": "tcp",
"address": "178.62.7.96:5483"
},
{
"endpoint": {
"protocol": "utp",
"address": "178.62.7.96:5483"
}
"protocol": "utp",
"address": "178.62.7.96:5483"
},
{
"endpoint": {
"protocol": "tcp",
"address": "128.199.199.210:5483"
}
"protocol": "tcp",
"address": "128.199.199.210:5483"
},
{
"endpoint": {
"protocol": "utp",
"address": "128.199.199.210:5483"
}
"protocol": "utp",
"address": "128.199.199.210:5483"
},
{
"endpoint": {
"protocol": "tcp",
"address": "37.59.98.1:5483"
}
"protocol": "tcp",
"address": "37.59.98.1:5483"
},
{
"endpoint": {
"protocol": "utp",
"address": "37.59.98.1:5483"
}
"protocol": "utp",
"address": "37.59.98.1:5483"
},
{
"endpoint": {
"protocol": "tcp",
"address": "45.79.93.11:5483"
}
"protocol": "tcp",
"address": "45.79.93.11:5483"
},
{
"endpoint": {
"protocol": "utp",
"address": "45.79.93.11:5483"
}
"protocol": "utp",
"address": "45.79.93.11:5483"
},
{
"endpoint": {
"protocol": "tcp",
"address": "45.79.2.52:5483"
}
"protocol": "tcp",
"address": "45.79.2.52:5483"
},
{
"endpoint": {
"protocol": "utp",
"address": "45.79.2.52:5483"
}
"protocol": "utp",
"address": "45.79.2.52:5483"
}
],
"beacon_port": 5484
Expand Down
Loading

0 comments on commit 706a6f8

Please sign in to comment.