diff --git a/core/src/core/block.rs b/core/src/core/block.rs index 5ad9d4feb6..e180131713 100644 --- a/core/src/core/block.rs +++ b/core/src/core/block.rs @@ -218,7 +218,7 @@ impl Block { // repeated iterations, revisit if a problem // validate each transaction and gather their proofs - let mut proofs = try_oap_vec!(txs, |tx| tx.verify_sig(&secp)); + let mut proofs = try_map_vec!(txs, |tx| tx.verify_sig(&secp)); proofs.push(reward_proof); // build vectors with all inputs and all outputs, ordering them by hash diff --git a/core/src/macros.rs b/core/src/macros.rs index c2f2588fd4..59d3017e47 100644 --- a/core/src/macros.rs +++ b/core/src/macros.rs @@ -29,7 +29,7 @@ macro_rules! map_vec { /// Same as map_vec when the map closure returns Results. Makes sure the /// results are "pushed up" and wraps with a try. #[macro_export] -macro_rules! try_oap_vec { +macro_rules! try_map_vec { ($thing:expr, $mapfn:expr ) => { try!($thing.iter() .map($mapfn) diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 0629c109db..983827a39e 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -17,12 +17,12 @@ use std::sync::RwLock; use rand::Rng; use rand::os::OsRng; +use mioco::tcp::{TcpStream, Shutdown}; use core::ser::{serialize, deserialize, Error}; use msg::*; use types::*; use protocol::ProtocolV1; -use peer::PeerConn; const NONCES_CAP: usize = 100; @@ -44,20 +44,20 @@ impl Handshake { } /// Handles connecting to a new remote peer, starting the version handshake. - pub fn connect<'a>(&'a self, peer: &'a mut PeerConn) -> Result, Error> { + pub fn connect(&self, mut conn: TcpStream) -> Result<(Box, PeerInfo), Error> { // get a new nonce that can be used on handshake to detect self-connection let nonce = self.next_nonce(); // send the first part of the handshake - let sender_addr = SockAddr(peer.local_addr()); - let receiver_addr = SockAddr(peer.peer_addr()); - let opt_err = serialize(peer, + let sender_addr = conn.local_addr().unwrap(); + let receiver_addr = conn.peer_addr().unwrap(); + let opt_err = serialize(&mut conn, &Hand { version: PROTOCOL_VERSION, capabilities: FULL_SYNC, nonce: nonce, - sender_addr: sender_addr, - receiver_addr: receiver_addr, + sender_addr: SockAddr(sender_addr), + receiver_addr: SockAddr(receiver_addr), user_agent: USER_AGENT.to_string(), }); match opt_err { @@ -66,9 +66,9 @@ impl Handshake { } // deserialize the handshake response and do version negotiation - let shake = try!(deserialize::(peer)); + let shake = try!(deserialize::(&mut conn)); if shake.version != 1 { - self.close(peer, + self.close(&mut conn, ErrCodes::UnsupportedVersion as u32, format!("Unsupported version: {}, ours: {})", shake.version, @@ -78,61 +78,70 @@ impl Handshake { received: vec![shake.version as u8], }); } - peer.capabilities = shake.capabilities; - peer.user_agent = shake.user_agent; - info!("Connected to peer {}", peer); + let peer_info = PeerInfo{ + capabilities: shake.capabilities, + user_agent: shake.user_agent, + addr: receiver_addr, + version: shake.version, + }; + + info!("Connected to peer {:?}", peer_info); // when more than one protocol version is supported, choosing should go here - Ok(Box::new(ProtocolV1::new(peer))) + Ok((Box::new(ProtocolV1::new(conn)), peer_info)) } /// Handles receiving a connection from a new remote peer that started the /// version handshake. - pub fn handshake<'a>(&'a self, peer: &'a mut PeerConn) -> Result, Error> { - // deserialize first part of handshake sent to us and do version negotiation - let hand = try!(deserialize::(peer)); - if hand.version != 1 { - self.close(peer, - ErrCodes::UnsupportedVersion as u32, - format!("Unsupported version: {}, ours: {})", - hand.version, - PROTOCOL_VERSION)); - return Err(Error::UnexpectedData { - expected: vec![PROTOCOL_VERSION as u8], - received: vec![hand.version as u8], - }); - } - { - // check the nonce to see if we could be trying to connect to ourselves - let nonces = self.nonces.read().unwrap(); - if nonces.contains(&hand.nonce) { - return Err(Error::UnexpectedData { - expected: vec![], - received: vec![], - }); - } - } - - // all good, keep peer info - peer.capabilities = hand.capabilities; - peer.user_agent = hand.user_agent; - - // send our reply with our info - let opt_err = serialize(peer, - &Shake { - version: PROTOCOL_VERSION, - capabilities: FULL_SYNC, - user_agent: USER_AGENT.to_string(), - }); - match opt_err { - Some(err) => return Err(err), - None => {} - } - - info!("Received connection from peer {}", peer); - // when more than one protocol version is supported, choosing should go here - Ok(Box::new(ProtocolV1::new(peer))) - } + pub fn handshake(&self, mut conn: TcpStream) -> Result<(Box, PeerInfo), Error> { + // deserialize first part of handshake sent to us and do version negotiation + let hand = try!(deserialize::(&mut conn)); + if hand.version != 1 { + self.close(&mut conn, + ErrCodes::UnsupportedVersion as u32, + format!("Unsupported version: {}, ours: {})", + hand.version, + PROTOCOL_VERSION)); + return Err(Error::UnexpectedData { + expected: vec![PROTOCOL_VERSION as u8], + received: vec![hand.version as u8], + }); + } + { + // check the nonce to see if we could be trying to connect to ourselves + let nonces = self.nonces.read().unwrap(); + if nonces.contains(&hand.nonce) { + return Err(Error::UnexpectedData { + expected: vec![], + received: vec![], + }); + } + } + + // all good, keep peer info + let peer_info = PeerInfo{ + capabilities: hand.capabilities, + user_agent: hand.user_agent, + addr: conn.peer_addr().unwrap(), + version: hand.version, + }; + + // send our reply with our info + let opt_err = serialize(&mut conn, + &Shake { + version: PROTOCOL_VERSION, + capabilities: FULL_SYNC, + user_agent: USER_AGENT.to_string(), + }); + match opt_err { + Some(err) => return Err(err), + None => {} + } + + info!("Received connection from peer {:?}", peer_info); + // when more than one protocol version is supported, choosing should go here + Ok((Box::new(ProtocolV1::new(conn)), peer_info)) + } /// Generate a new random nonce and store it in our ring buffer fn next_nonce(&self) -> u64 { @@ -147,12 +156,12 @@ impl Handshake { nonce } - fn close(&self, peer: &mut PeerConn, err_code: u32, explanation: String) { - serialize(peer, + fn close(&self, conn: &mut TcpStream, err_code: u32, explanation: String) { + serialize(conn, &PeerError { code: err_code, message: explanation, }); - peer.close(); + conn.shutdown(Shutdown::Both); } } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 02904c23e3..bd51f079d4 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -27,6 +27,7 @@ extern crate bitflags; extern crate grin_core as core; #[macro_use] extern crate log; +#[macro_use] extern crate mioco; extern crate rand; extern crate time; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 8e120e136f..d5f21aa502 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -18,6 +18,8 @@ use std::net::*; use core::ser::{self, Writeable, Readable, Writer, Reader, Error}; +use types::*; + /// Current latest version of the protocol pub const PROTOCOL_VERSION: u32 = 1; /// Grin's user agent with current version (TODO externalize) @@ -31,16 +33,6 @@ pub enum ErrCodes { UnsupportedVersion = 100, } -bitflags! { - /// Options for block validation - pub flags Capabilities: u32 { - /// We don't know (yet) what the peer can do. - const UNKNOWN = 0b00000000, - /// Runs with the easier version of the Proof of Work, mostly to make testing easier. - const FULL_SYNC = 0b00000001, - } -} - /// Types of messages #[derive(Clone, Copy)] pub enum Type { @@ -49,6 +41,8 @@ pub enum Type { SHAKE, PING, PONG, + GETPEERADDRS, + PEERADDRS, /// Never used over the network but to detect unrecognized types. MaxMsgType, } @@ -92,6 +86,8 @@ impl Readable for MsgHeader { 2 => Type::SHAKE, 3 => Type::PING, 4 => Type::PONG, + 5 => Type::GETPEERADDRS, + 6 => Type::PEERADDRS, _ => panic!(), }, }) @@ -180,6 +176,54 @@ impl Readable for Shake { } } +/// Ask for other peers addresses, required for network discovery. +pub struct GetPeerAddrs { + /// Filters on the capabilities we'd like the peers to have + pub capabilities: Capabilities, +} + +impl Writeable for GetPeerAddrs { + fn write(&self, writer: &mut Writer) -> Option { + writer.write_u32(self.capabilities.bits()) + } +} + +impl Readable for GetPeerAddrs { + fn read(reader: &mut Reader) -> Result { + let capab = try!(reader.read_u32()); + let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); + Ok(GetPeerAddrs { capabilities: capabilities }) + } +} + +/// Peer addresses we know of that are fresh enough, in response to +/// GetPeerAddrs. +pub struct PeerAddrs { + pub peers: Vec, +} + +impl Writeable for PeerAddrs { + fn write(&self, writer: &mut Writer) -> Option { + try_o!(writer.write_u32(self.peers.len() as u32)); + for p in &self.peers { + p.write(writer); + } + None + } +} + +impl Readable for PeerAddrs { + fn read(reader: &mut Reader) -> Result { + let peer_count = try!(reader.read_u32()); + if peer_count > 1000 { + return Err(ser::Error::TooLargeReadErr(format!("Too many peers provided: {}", + peer_count))); + } + let peers = try_map_vec!([0..peer_count], |_| SockAddr::read(reader)); + Ok(PeerAddrs { peers: peers }) + } +} + /// We found some issue in the communication, sending an error back, usually /// followed by closing the connection. pub struct PeerError { @@ -247,7 +291,7 @@ impl Readable for SockAddr { ip[3]), port)))) } else { - let ip = try_oap_vec!([0..8], |_| reader.read_u16()); + let ip = try_map_vec!([0..8], |_| reader.read_u16()); let port = try!(reader.read_u16()); Ok(SockAddr(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(ip[0], ip[1], diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 28ebb9fb29..1433e61075 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -12,88 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; -use std::io::{self, Read, Write}; -use std::net::SocketAddr; +use mioco::tcp::TcpStream; -use mioco::tcp::{TcpStream, Shutdown}; - -use handshake::Handshake; use core::ser::Error; +use handshake::Handshake; use msg::*; use types::*; -/// The local representation of a remotely connected peer. Handles most -/// low-level network communication and tracks peer information. -pub struct PeerConn { - conn: TcpStream, - pub capabilities: Capabilities, - pub user_agent: String, -} - -impl fmt::Display for PeerConn { - // This trait requires `fmt` with this exact signature. - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{} {}", self.peer_addr(), self.user_agent) - } -} - -/// Make the Peer a Reader for convenient access to the underlying connection. -/// Allows the peer to track how much is received. -impl Read for PeerConn { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.conn.read(buf) - } -} - -/// Make the Peer a Writer for convenient access to the underlying connection. -/// Allows the peer to track how much is sent. -impl Write for PeerConn { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.conn.write(buf) - } - fn flush(&mut self) -> io::Result<()> { - self.conn.flush() - } -} - -impl Close for PeerConn { - fn close(&self) { - self.conn.shutdown(Shutdown::Both); - } -} - -impl PeerConn { - /// Create a new local peer instance connected to a remote peer with the - /// provided TcpStream. - pub fn new(conn: TcpStream) -> PeerConn { - // don't wait on read for more than 2 seconds by default - conn.set_keepalive(Some(2)); - - PeerConn { - conn: conn, - capabilities: UNKNOWN, - user_agent: "".to_string(), - } - } - - pub fn connect(&mut self, hs: &Handshake, na: &NetAdapter) -> Option { - let mut proto = try_to_o!(hs.connect(self)); - proto.handle(na) - } - - pub fn handshake(&mut self, hs: &Handshake, na: &NetAdapter) -> Option { - let mut proto = try_to_o!(hs.handshake(self)); - proto.handle(na) - } +pub struct Peer { + info: PeerInfo, + proto: Box, } -impl PeerInfo for PeerConn { - fn peer_addr(&self) -> SocketAddr { - self.conn.peer_addr().unwrap() - } - fn local_addr(&self) -> SocketAddr { - // TODO likely not exactly what we want (private vs public IP) - self.conn.local_addr().unwrap() - } +unsafe impl Sync for Peer {} +unsafe impl Send for Peer {} + +impl Peer { + + pub fn connect(conn: TcpStream, hs: &Handshake) -> Result { + let (proto, info) = try!(hs.connect(conn)); + Ok(Peer{ + info: info, + proto: proto, + }) + } + + pub fn accept(conn: TcpStream, hs: &Handshake) -> Result { + let (proto, info) = try!(hs.handshake(conn)); + Ok(Peer{ + info: info, + proto: proto, + }) + } + + pub fn run(&self, na: &NetAdapter) -> Option { + self.proto.handle(na) + } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 039989ae9e..e0430de475 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -12,37 +12,56 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cell::RefCell; +use std::ops::DerefMut; +use std::rc::Rc; + +use mioco; +use mioco::sync::mpsc::sync_channel; +use mioco::tcp::{TcpStream, Shutdown}; + use core::ser; +use handshake::Handshake; use msg::*; use types::*; -use peer::PeerConn; -pub struct ProtocolV1<'a> { - peer: &'a mut PeerConn, +pub struct ProtocolV1 { + conn: RefCell, } -impl<'a> Protocol for ProtocolV1<'a> { - fn handle(&mut self, server: &NetAdapter) -> Option { +impl Protocol for ProtocolV1 { + fn handle(&self, server: &NetAdapter) -> Option { + // setup a channel so we can switch between reads and writes + let (send, recv) = sync_channel(10); + + let mut conn = self.conn.borrow_mut(); loop { - let header = try_to_o!(ser::deserialize::(self.peer)); - if !header.acceptable() { - continue; - } + select!( + r:conn => { + let header = try_to_o!(ser::deserialize::(conn.deref_mut())); + if !header.acceptable() { + continue; + } + }, + r:recv => { + ser::serialize(conn.deref_mut(), recv.recv().unwrap()); + } + ); } } } -impl<'a> ProtocolV1<'a> { - pub fn new(p: &mut PeerConn) -> ProtocolV1 { - ProtocolV1 { peer: p } +impl ProtocolV1 { + pub fn new(conn: TcpStream) -> ProtocolV1 { + ProtocolV1 { conn: RefCell::new(conn) } } - fn close(&mut self, err_code: u32, explanation: &'static str) { - ser::serialize(self.peer, - &PeerError { - code: err_code, - message: explanation.to_string(), - }); - self.peer.close(); - } + // fn close(&mut self, err_code: u32, explanation: &'static str) { + // ser::serialize(self.conn, + // &PeerError { + // code: err_code, + // message: explanation.to_string(), + // }); + // self.conn.shutdown(Shutdown::Both); + // } } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index b6b1652e43..d054889251 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -15,17 +15,18 @@ //! Grin server implementation, accepts incoming connections and connects to //! other peers in the network. +use std::cell::RefCell; use std::io; use std::net::SocketAddr; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use mioco; use mioco::tcp::{TcpListener, TcpStream}; use core::ser::Error; use handshake::Handshake; -use peer::PeerConn; +use peer::Peer; use types::*; pub const DEFAULT_LISTEN_ADDR: &'static str = "127.0.0.1:3414"; @@ -38,14 +39,20 @@ fn listen_addr() -> SocketAddr { struct DummyAdapter {} impl NetAdapter for DummyAdapter {} +/// P2P server implementation, handling bootstrapping to find and connect to +/// peers, receiving connections from other peers and keep track of all of them. pub struct Server { + peers: RwLock>>, } +// TODO TLS impl Server { + pub fn new() -> Server { + Server{peers: RwLock::new(Vec::new())} + } /// Creates a new p2p server. Opens a TCP port to allow incoming /// connections and starts the bootstrapping process to find peers. - pub fn start() -> Result { - // TODO TLS + pub fn start(&'static self) -> Result<(), Error> { mioco::spawn(move || -> io::Result<()> { let addr = DEFAULT_LISTEN_ADDR.parse().unwrap(); let listener = try!(TcpListener::bind(&addr)); @@ -55,11 +62,16 @@ impl Server { loop { let conn = try!(listener.accept()); - let hs_child = hs.clone(); + let hs = hs.clone(); mioco::spawn(move || -> io::Result<()> { - let ret = PeerConn::new(conn).handshake(&hs_child, &DummyAdapter {}); - if let Some(err) = ret { + let peer = try!(Peer::connect(conn, &hs).map_err(|_| io::Error::last_os_error())); + let wpeer = Arc::new(peer); + { + let mut peers = self.peers.write().unwrap(); + peers.push(wpeer.clone()); + } + if let Some(err) = wpeer.run(&DummyAdapter{}) { error!("{:?}", err); } Ok(()) @@ -67,18 +79,13 @@ impl Server { } Ok(()) }); - Ok(Server {}) + Ok(()) } /// Simulates an unrelated client connecting to our server. Mostly used for /// tests. - pub fn connect_as_client(addr: SocketAddr) -> Option { + pub fn connect_as_client(addr: SocketAddr) -> Result { let tcp_client = TcpStream::connect(&addr).unwrap(); - let mut peer = PeerConn::new(tcp_client); - let hs = Handshake::new(); - if let Err(e) = hs.connect(&mut peer) { - return Some(e); - } - None + Peer::accept(tcp_client, &Handshake::new()) } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 01b213b2a0..4babfb4b51 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -16,17 +16,23 @@ use std::io::{Read, Write}; use std::net::SocketAddr; use core::ser::Error; -/// Trait for pre-emptively and forcefully closing an underlying resource. -pub trait Close { - fn close(&self); +bitflags! { + /// Options for block validation + pub flags Capabilities: u32 { + /// We don't know (yet) what the peer can do. + const UNKNOWN = 0b00000000, + /// Runs with the easier version of the Proof of Work, mostly to make testing easier. + const FULL_SYNC = 0b00000001, + } } /// General information about a connected peer that's useful to other modules. -pub trait PeerInfo { - /// Address of the remote peer - fn peer_addr(&self) -> SocketAddr; - /// Our address, communicated to other peers - fn local_addr(&self) -> SocketAddr; +#[derive(Debug)] +pub struct PeerInfo { + pub capabilities: Capabilities, + pub user_agent: String, + pub version: u32, + pub addr: SocketAddr, } /// A given communication protocol agreed upon between 2 peers (usually @@ -34,7 +40,7 @@ pub trait PeerInfo { pub trait Protocol { /// Starts handling protocol communication, the peer(s) is expected to be /// known already, usually passed during construction. - fn handle(&mut self, na: &NetAdapter) -> Option; + fn handle(&self, na: &NetAdapter) -> Option; } /// Bridge between the networking layer and the rest of the system. Handles the