Skip to content

Commit

Permalink
More tests (#94)
Browse files Browse the repository at this point in the history
* fix/docs: fix mio types we use

* test/udp_rendezvous_server: add integration test;
  also exposed UdpRendezvousClient (not as a crate API though,
  only for internal modules for testing).

* make UdpRendezvousServer return it's listen address (in case
  someone specifies port 0 for it)

* fixed UdpRendezvousServer on Windows where the client tried to
  connect with the server using 0.0.0.0 IP address.

* fix/ci: skip rustfmt on Windows

We hit the rustfmt bug: rust-lang/rustfmt#1873 .
Currently, rustfmt fails to format when path attribute has relative
paths.

Usually, our code does not have too much windows specific code and in
this case there's no such code at the moment. So, it's the easiest fix
is to disable rustfmt on Windows for now hoping for the fix in the future.
  • Loading branch information
povilasb authored and ustulation committed Nov 27, 2018
1 parent 93290d0 commit c7f99c7
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 226 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ before_script:
fi
script:
- set -x;
cargo fmt -- --check &&
cargo test --verbose --release
- if [ "${TRAVIS_OS_NAME}" = linux ]; then
(
set -x;
cargo fmt -- --check &&
cargo clippy --verbose --release &&
cargo clippy --verbose --release --profile=test ;
)
Expand Down
12 changes: 9 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,15 @@ extern crate serde_derive;
extern crate unwrap;

extern crate bincode;
#[cfg(test)]
extern crate maidsafe_utilities;
extern crate mio;
extern crate mio_extras;
extern crate net2;
extern crate rand;
extern crate rust_sodium as sodium;
#[cfg(test)]
extern crate serde_json;
extern crate socket_collection;

use bincode::{deserialize, serialize, Infinite};
Expand All @@ -300,6 +304,8 @@ mod error;
mod hole_punch;
mod queued_notifier;
mod tcp;
#[cfg(test)]
mod test_utils;
mod udp;

pub use config::Config;
Expand Down Expand Up @@ -346,7 +352,7 @@ impl NatTimer {
/// A message that can be sent to the event loop to perform an action.
///
/// This can be used to send actions from a thread outside the event loop too if sent via
/// [`mio::channel::Sender`][0].
/// [`mio_extras::channel::Sender`][0].
///
/// [0]: http://rust-doc.s3-website-us-east-1.amazonaws.com/mio/master/mio/channel
pub struct NatMsg(Box<FnMut(&mut Interface, &Poll) + Send + 'static>);
Expand Down Expand Up @@ -408,8 +414,8 @@ pub trait Interface {
fn remove_state(&mut self, token: Token) -> Option<Rc<RefCell<NatState>>>;
/// Return the state (without removing - just a query) associated with the `token`
fn state(&mut self, token: Token) -> Option<Rc<RefCell<NatState>>>;
/// Set timeout. User code is expected to have a `mio::timer::Timer<NatTimer>` on which the
/// timeout can be set.
/// Set timeout. User code is expected to have a `mio_extras::timer::Timer<NatTimer>` on which
/// the timeout can be set.
fn set_timeout(&mut self, duration: Duration, timer_detail: NatTimer) -> Timeout;
/// Cancel the timout
fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<NatTimer>;
Expand Down
203 changes: 203 additions & 0 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
use super::{Config, Interface, NatMsg, NatState, NatTimer};
use maidsafe_utilities::thread::{self, Joiner};
use mio::{Events, Poll, PollOpt, Ready, Token};
use mio_extras::channel;
use mio_extras::channel::Sender;
use mio_extras::timer::{Timeout, Timer};
use serde_json;
use sodium::crypto::box_;
use std::any::Any;
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::rc::Rc;
use std::time::Duration;

/// Simplified state machine implementation for tests.
pub struct StateMachine {
pub nat_states: HashMap<Token, Rc<RefCell<NatState>>>,
pub timer: Timer<NatTimer>,
pub token: usize,
pub config: Config,
pub enc_pk: box_::PublicKey,
pub enc_sk: box_::SecretKey,
pub tx: Sender<NatMsg>,
}

impl StateMachine {
pub fn handle_nat_timer(&mut self, poll: &Poll) {
while let Some(nat_timer) = self.timer.poll() {
if let Some(nat_state) = self.state(nat_timer.associated_nat_state) {
nat_state
.borrow_mut()
.timeout(self, poll, nat_timer.timer_id);
}
}
}

pub fn handle_readiness(&mut self, poll: &Poll, token: Token, kind: Ready) {
if let Some(nat_state) = self.state(token) {
return nat_state.borrow_mut().ready(self, poll, kind);
}
}
}

impl Interface for StateMachine {
fn insert_state(
&mut self,
token: Token,
state: Rc<RefCell<NatState>>,
) -> Result<(), (Rc<RefCell<NatState>>, String)> {
if let Entry::Vacant(ve) = self.nat_states.entry(token) {
let _ = ve.insert(state);
Ok(())
} else {
Err((state, "Token is already mapped".to_string()))
}
}

fn remove_state(&mut self, token: Token) -> Option<Rc<RefCell<NatState>>> {
self.nat_states.remove(&token)
}

fn state(&mut self, token: Token) -> Option<Rc<RefCell<NatState>>> {
self.nat_states.get(&token).cloned()
}

fn set_timeout(&mut self, duration: Duration, timer_detail: NatTimer) -> Timeout {
self.timer.set_timeout(duration, timer_detail)
}

fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<NatTimer> {
self.timer.cancel_timeout(timeout)
}

fn new_token(&mut self) -> Token {
self.token += 1;
Token(self.token)
}

fn config(&self) -> &Config {
&self.config
}

fn enc_pk(&self) -> &box_::PublicKey {
&self.enc_pk
}

fn enc_sk(&self) -> &box_::SecretKey {
&self.enc_sk
}

fn sender(&self) -> &Sender<NatMsg> {
&self.tx
}

fn as_any(&mut self) -> &mut Any {
self
}
}

/// Spawn testing event loop in a separate thread and return a handle to control it.
pub fn spawn_event_loop(config: Config) -> EventLoop {
let (done_tx, done_rx) = channel::channel();
let (nat_tx, nat_rx) = channel::channel();
let nat_tx2 = nat_tx.clone();

let j = thread::named("Event-Loop", move || {
const TIMER_TOKEN: usize = 0;
const DONE_RX_TOKEN: usize = TIMER_TOKEN + 1;
const NAT_RX_TOKEN: usize = DONE_RX_TOKEN + 1;

let poll = unwrap!(Poll::new());

let (enc_pk, enc_sk) = box_::gen_keypair();
let timer = Timer::default();

unwrap!(poll.register(
&timer,
Token(TIMER_TOKEN),
Ready::readable(),
PollOpt::edge(),
));
unwrap!(poll.register(
&done_rx,
Token(DONE_RX_TOKEN),
Ready::readable(),
PollOpt::edge(),
));
unwrap!(poll.register(
&nat_rx,
Token(NAT_RX_TOKEN),
Ready::readable(),
PollOpt::edge(),
));

let mut sm = StateMachine {
nat_states: HashMap::with_capacity(10),
timer,
token: NAT_RX_TOKEN + 1,
config,
enc_pk,
enc_sk,
tx: nat_tx,
};

let mut events = Events::with_capacity(1024);

'event_loop: loop {
unwrap!(poll.poll(&mut events, None));

for event in events.iter() {
match event.token() {
Token(TIMER_TOKEN) => {
assert!(event.readiness().is_readable());
sm.handle_nat_timer(&poll);
}
Token(DONE_RX_TOKEN) => {
assert!(event.readiness().is_readable());
unwrap!(done_rx.try_recv());
break 'event_loop;
}
Token(NAT_RX_TOKEN) => {
assert!(event.readiness().is_readable());
while let Ok(f) = nat_rx.try_recv() {
f.invoke(&mut sm, &poll);
}
}
t => sm.handle_readiness(&poll, t, event.readiness()),
}
}
}
});

EventLoop {
nat_tx: nat_tx2,
done_tx,
_j: j,
}
}

/// Handle to event loop running in a separate thread.
pub struct EventLoop {
pub nat_tx: Sender<NatMsg>,
pub done_tx: Sender<()>,
_j: Joiner,
}

impl Drop for EventLoop {
fn drop(&mut self) {
unwrap!(self.done_tx.send(()));
}
}

/// Read p2p config from json file.
#[allow(unused)]
pub fn read_config(path: &str) -> Config {
let mut file = unwrap!(File::open(path));
let mut content = String::new();
unwrap!(file.read_to_string(&mut content));
unwrap!(serde_json::from_str(&content))
}
2 changes: 1 addition & 1 deletion src/udp/hole_punch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use self::puncher::Puncher;
use self::rendezvous_client::UdpRendezvousClient;
pub use self::rendezvous_client::UdpRendezvousClient;
use mio::Poll;
use mio::Token;
use socket_collection::UdpSock;
Expand Down
2 changes: 2 additions & 0 deletions src/udp/hole_punch/rendezvous_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use {Interface, NatError, NatState, NatType};

pub type Finish = Box<FnMut(&mut Interface, &Poll, Token, NatType, ::Res<(UdpSock, SocketAddr)>)>;

/// UDP rendezvous client that queries the server for it's public endpoint - IP:port.
pub struct UdpRendezvousClient {
sock: UdpSock,
token: Token,
Expand All @@ -25,6 +26,7 @@ pub struct UdpRendezvousClient {
}

impl UdpRendezvousClient {
/// Starts sending queries to multiple servers. Servers are retrieved from `ifc.config()`.
pub fn start(ifc: &mut Interface, poll: &Poll, sock: UdpSock, f: Finish) -> ::Res<Token> {
let token = ifc.new_token();
let mut servers = ifc.config().remote_udp_rendezvous_servers.clone();
Expand Down
76 changes: 73 additions & 3 deletions src/udp/rendezvous_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,22 @@ pub struct UdpRendezvousServer {

impl UdpRendezvousServer {
/// Boot the UDP Rendezvous server. This should normally be called only once.
pub fn start(ifc: &mut Interface, poll: &Poll) -> ::Res<Token> {
/// Uses the bind port specified by `Interface::config()`. If port is 0, OS chooses a random
/// available port, which you can find out by checking the return value.
///
/// # Returns
///
/// A tuple of mio token associated with the rendezvous server and local listen address.
pub fn start(ifc: &mut Interface, poll: &Poll) -> ::Res<(Token, SocketAddr)> {
let port = ifc
.config()
.udp_rendezvous_port
.unwrap_or(UDP_RENDEZVOUS_PORT);
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port));
let sock = UdpSock::bind(&addr)?;
let server_addr = sock.local_addr()?;

let token = ifc.new_token();

poll.register(
&sock,
token,
Expand All @@ -54,7 +60,7 @@ impl UdpRendezvousServer {
server.borrow_mut().terminate(ifc, poll);
Err(NatError::UdpRendezvousServerStartFailed)
} else {
Ok(token)
Ok((token, server_addr))
}
}

Expand Down Expand Up @@ -117,3 +123,67 @@ impl NatState for UdpRendezvousServer {
self
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
use std::sync::mpsc;
use test_utils::spawn_event_loop;
use udp::hole_punch::UdpRendezvousClient;
use {Config, NatMsg};

/// Creates config for tests with all values zeroed.
fn p2p_test_cfg() -> Config {
Config {
rendezvous_timeout_sec: None,
hole_punch_timeout_sec: None,
hole_punch_wait_for_other: None,
udp_rendezvous_port: None,
tcp_rendezvous_port: None,
remote_udp_rendezvous_servers: Vec::new(),
remote_tcp_rendezvous_servers: Vec::new(),
udp_hole_punchers: Vec::new(),
}
}

#[test]
fn it_responds_with_client_address() {
let mut config = p2p_test_cfg();
config.udp_rendezvous_port = Some(0);
let server_el = spawn_event_loop(config);
let (server_port_tx, server_port_rx) = mpsc::channel();
unwrap!(server_el.nat_tx.send(NatMsg::new(move |ifc, poll| {
let (_token, addr) = unwrap!(UdpRendezvousServer::start(ifc, poll));
unwrap!(server_port_tx.send(addr.port()));
})));

let server_port = unwrap!(server_port_rx.recv());
let server_addr =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), server_port));
let (addr_tx, addr_rx) = mpsc::channel();
let mut config = p2p_test_cfg();
config.remote_udp_rendezvous_servers = vec![server_addr];
let client_el = spawn_event_loop(config);
let addr = unwrap!("127.0.0.1:0".parse());
let sock = unwrap!(UdpSock::bind(&addr));
let exp_client_addr = unwrap!(sock.local_addr());

unwrap!(client_el.nat_tx.send(NatMsg::new(move |ifc, poll| {
let on_done = Box::new(
move |_ifc: &mut Interface,
_poll: &Poll,
_child,
_nat_type,
res: ::Res<(UdpSock, SocketAddr)>| {
let client_addr = unwrap!(res).1;
unwrap!(addr_tx.send(client_addr));
},
);
let _ = unwrap!(UdpRendezvousClient::start(ifc, poll, sock, on_done));
})));

let client_addr = unwrap!(addr_rx.recv());
assert_eq!(client_addr, exp_client_addr);
}
}
Loading

0 comments on commit c7f99c7

Please sign in to comment.