From a23308dd9d7f767a00eefb1be029a1ee773da73f Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sun, 30 Oct 2016 08:24:19 -0700 Subject: [PATCH] Added clean stop logic on server and peers. Broken unit test. --- p2p/src/peer.rs | 4 ++++ p2p/src/protocol.rs | 38 ++++++++++++++++++++++--------------- p2p/src/server.rs | 18 +++++++++++++----- p2p/src/types.rs | 2 ++ p2p/tests/network_conn.rs | 40 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 82 insertions(+), 20 deletions(-) create mode 100644 p2p/tests/network_conn.rs diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 1433e61075..2ea61201a8 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -48,4 +48,8 @@ impl Peer { pub fn run(&self, na: &NetAdapter) -> Option { self.proto.handle(na) } + + pub fn stop(&self) { + self.proto.as_ref().close() + } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index e0430de475..64c8c1ee5d 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -17,7 +17,7 @@ use std::ops::DerefMut; use std::rc::Rc; use mioco; -use mioco::sync::mpsc::sync_channel; +use mioco::sync::mpsc::{sync_channel, SyncSender}; use mioco::tcp::{TcpStream, Shutdown}; use core::ser; @@ -27,12 +27,19 @@ use types::*; pub struct ProtocolV1 { conn: RefCell, + //msg_send: Option>, + stop_send: RefCell>>, } impl Protocol for ProtocolV1 { fn handle(&self, server: &NetAdapter) -> Option { - // setup a channel so we can switch between reads and writes - let (send, recv) = sync_channel(10); + // setup channels so we can switch between reads, writes and close + let (msg_send, msg_recv) = sync_channel(10); + let (stop_send, stop_recv) = sync_channel(1); + + //self.msg_send = Some(msg_send); + let mut stop_mut = self.stop_send.borrow_mut(); + *stop_mut = Some(stop_send); let mut conn = self.conn.borrow_mut(); loop { @@ -43,25 +50,26 @@ impl Protocol for ProtocolV1 { continue; } }, - r:recv => { - ser::serialize(conn.deref_mut(), recv.recv().unwrap()); + r:msg_recv => { + ser::serialize(conn.deref_mut(), msg_recv.recv().unwrap()); + }, + r:stop_recv => { + stop_recv.recv(); + conn.shutdown(Shutdown::Both); + return None;; } ); } } + + fn close(&self) { + let stop_send = self.stop_send.borrow(); + stop_send.as_ref().unwrap().send(0); + } } impl ProtocolV1 { pub fn new(conn: TcpStream) -> ProtocolV1 { - ProtocolV1 { conn: RefCell::new(conn) } + ProtocolV1 { conn: RefCell::new(conn), /* msg_send: None, */ stop_send: RefCell::new(None) } } - - // fn close(&mut self, err_code: u32, explanation: &'static str) { - // ser::serialize(self.conn, - // &PeerError { - // code: err_code, - // message: explanation.to_string(), - // }); - // self.conn.shutdown(Shutdown::Both); - // } } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index d054889251..d8ec1b0335 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -18,6 +18,7 @@ use std::cell::RefCell; use std::io; use std::net::SocketAddr; +use std::ops::Deref; use std::str::FromStr; use std::sync::{Arc, RwLock}; @@ -52,7 +53,7 @@ impl Server { } /// Creates a new p2p server. Opens a TCP port to allow incoming /// connections and starts the bootstrapping process to find peers. - pub fn start(&'static self) -> Result<(), Error> { + pub fn start(& self) -> Result<(), Error> { mioco::spawn(move || -> io::Result<()> { let addr = DEFAULT_LISTEN_ADDR.parse().unwrap(); let listener = try!(TcpListener::bind(&addr)); @@ -67,10 +68,10 @@ impl Server { mioco::spawn(move || -> io::Result<()> { let peer = try!(Peer::connect(conn, &hs).map_err(|_| io::Error::last_os_error())); let wpeer = Arc::new(peer); - { - let mut peers = self.peers.write().unwrap(); - peers.push(wpeer.clone()); - } + // { + // let mut peers = self.peers.write().unwrap(); + // peers.push(wpeer.clone()); + // } if let Some(err) = wpeer.run(&DummyAdapter{}) { error!("{:?}", err); } @@ -82,6 +83,13 @@ impl Server { Ok(()) } + pub fn stop(&self) { + let peers = self.peers.write().unwrap(); + for p in peers.deref() { + p.stop(); + } + } + /// Simulates an unrelated client connecting to our server. Mostly used for /// tests. pub fn connect_as_client(addr: SocketAddr) -> Result { diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 4babfb4b51..37abbd930e 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -41,6 +41,8 @@ pub trait Protocol { /// Starts handling protocol communication, the peer(s) is expected to be /// known already, usually passed during construction. fn handle(&self, na: &NetAdapter) -> Option; + + fn close(&self); } /// Bridge between the networking layer and the rest of the system. Handles the diff --git a/p2p/tests/network_conn.rs b/p2p/tests/network_conn.rs new file mode 100644 index 0000000000..44d4c66baf --- /dev/null +++ b/p2p/tests/network_conn.rs @@ -0,0 +1,40 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +extern crate grin_p2p as p2p; +extern crate mioco; +extern crate env_logger; + +use std::io; +use std::thread; +use std::time; + +#[test] +fn peer_handshake() { + env_logger::init().unwrap(); + + mioco::start(|| -> io::Result<()> { + let server = p2p::Server::new(); + server.start(); + + // given server a little time to start + mioco::sleep(time::Duration::from_millis(200)); + + let addr = p2p::DEFAULT_LISTEN_ADDR.parse().unwrap(); + try!(p2p::Server::connect_as_client(addr).map_err(|_| io::Error::last_os_error())); + + server.stop(); + mioco::shutdown(); + }).unwrap().unwrap(); +}