From 9fa1ebebb409ce896838987144bf5ecdebc38d64 Mon Sep 17 00:00:00 2001 From: piaoliu <441594700@qq.com> Date: Thu, 28 Feb 2019 11:45:41 +0800 Subject: [PATCH 1/2] Report open proto failed --- Cargo.toml | 1 + src/context.rs | 2 +- src/service.rs | 25 ++++++++-- src/session.rs | 123 +++++++++++++++++++++++++++-------------------- src/substream.rs | 3 ++ 5 files changed, 97 insertions(+), 57 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 36bc5d2b..4e38fd7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ description = "Minimal implementation for a multiplexed p2p network framework." authors = ["piaoliu <441594700@qq.com>", "Nervos Core Dev "] repository = "https://github.com/nervosnetwork/p2p" include = ["Cargo.toml", "src/*", "README.md", "LICENSE"] +readme = "README.md" keywords = ["network", "peer-to-peer"] categories = ["network-programming", "asynchronous"] edition = "2018" diff --git a/src/context.rs b/src/context.rs index c93ec024..37340cd4 100644 --- a/src/context.rs +++ b/src/context.rs @@ -22,7 +22,7 @@ use crate::{ }; /// Session context, contains basic information about the current connection -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct SessionContext { pub(crate) event_sender: mpsc::Sender, /// Session's ID diff --git a/src/service.rs b/src/service.rs index edd7b177..c8fbdcae 100644 --- a/src/service.rs +++ b/src/service.rs @@ -34,7 +34,7 @@ use crate::{ traits::{ProtocolMeta, ServiceHandle, ServiceProtocol, SessionProtocol}, utils::{dns::DNSResolver, extract_peer_id, multiaddr_to_socketaddr}, yamux::{session::SessionType, Config as YamuxConfig}, - ProtocolId, SessionId, StreamId, + ProtocolId, SessionId, }; mod control; @@ -66,10 +66,18 @@ pub enum ServiceError { /// error error: Error, }, + /// Protocol select fail + ProtocolSelectError { + /// Protocol name, if none, timeout or other net problem, + /// if Some, don't support this proto + proto_name: Option, + /// Session context + session_context: SessionContext, + }, /// Protocol error during interaction ProtocolError { - /// Stream id - id: StreamId, + /// Session id + id: SessionId, /// Protocol id proto_id: ProtocolId, /// Codec error @@ -1022,6 +1030,17 @@ where .. } => self.protocol_open(id, proto_id, version), SessionEvent::ProtocolClose { id, proto_id, .. } => self.protocol_close(id, proto_id), + SessionEvent::ProtocolSelectError { id, proto_name } => { + if let Some(session_context) = self.sessions.get(&id).cloned() { + self.handle.handle_error( + &mut self.service_context, + ServiceError::ProtocolSelectError { + proto_name, + session_context, + }, + ) + } + } SessionEvent::ProtocolError { id, proto_id, diff --git a/src/session.rs b/src/session.rs index 9f7fec62..39f20319 100644 --- a/src/session.rs +++ b/src/session.rs @@ -7,7 +7,7 @@ use log::{debug, error, trace, warn}; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::{error, io, time::Duration}; -use tokio::codec::{Decoder, Encoder, Framed, FramedParts}; +use tokio::codec::{Decoder, Encoder, Framed, FramedParts, LengthDelimitedCodec}; use tokio::prelude::{AsyncRead, AsyncWrite, FutureExt}; use crate::{ @@ -95,10 +95,16 @@ pub(crate) enum SessionEvent { /// Stream id stream_id: StreamId, }, + ProtocolSelectError { + /// Session id + id: SessionId, + /// proto_name + proto_name: Option, + }, /// Codec error ProtocolError { - /// Stream id - id: StreamId, + /// Session id + id: SessionId, /// Protocol id proto_id: ProtocolId, /// Codec error @@ -181,21 +187,24 @@ where } } - /// After the session is established, the client is requested to open some custom protocol sub stream. - pub fn open_proto_stream(&mut self, proto_name: &str) { - debug!("try open proto, {}", proto_name); + /// select procedure + #[inline(always)] + fn select_procedure( + &mut self, + procedure: impl Future< + Item = ( + Framed, + String, + Option, + ), + Error = io::Error, + > + Send + + 'static, + ) { let event_sender = self.proto_event_sender.clone(); - let handle = self.socket.open_stream().unwrap(); - let versions = self - .protocol_configs - .get(proto_name) - .unwrap() - .support_versions(); - let proto_info = ProtocolInfo::new(&proto_name, versions); - - let task = client_select(handle, proto_info) - .and_then(|(handle, name, version)| { - match version { + let task = procedure.timeout(self.timeout).then(|result| { + match result { + Ok((handle, name, version)) => match version { Some(version) => { let send_task = event_sender.send(ProtocolEvent::Open { sub_stream: Box::new(handle), @@ -206,18 +215,47 @@ where error!("stream send back error: {:?}", err); })); } - None => debug!("Negotiation to open the protocol {} failed", name), + None => { + debug!("Negotiation to open the protocol {} failed", name); + let send_task = event_sender.send(ProtocolEvent::SelectError { + proto_name: Some(name), + }); + tokio::spawn(send_task.map(|_| ()).map_err(|err| { + error!("select error send back error: {:?}", err); + })); + } + }, + Err(err) => { + debug!("stream protocol select err: {:?}", err); + let send_task = + event_sender.send(ProtocolEvent::SelectError { proto_name: None }); + tokio::spawn(send_task.map(|_| ()).map_err(|err| { + error!("select error send back error: {:?}", err); + })); } - Ok(()) - }) - .timeout(self.timeout) - .map_err(|err| { - trace!("stream protocol select err: {:?}", err); - }); + } + + Ok(()) + }); tokio::spawn(task); } + /// After the session is established, the client is requested to open some custom protocol sub stream. + pub fn open_proto_stream(&mut self, proto_name: &str) { + debug!("try open proto, {}", proto_name); + let handle = self.socket.open_stream().unwrap(); + let versions = self + .protocol_configs + .get(proto_name) + .unwrap() + .support_versions(); + let proto_info = ProtocolInfo::new(&proto_name, versions); + + let task = client_select(handle, proto_info); + self.select_procedure(task); + } + /// Push the generated event to the Service #[inline] fn event_output(&mut self, event: SessionEvent) { @@ -277,7 +315,6 @@ where /// Handling client-initiated open protocol sub stream requests fn handle_sub_stream(&mut self, sub_stream: StreamHandle) { - let event_sender = self.proto_event_sender.clone(); let proto_metas = self .protocol_configs .values() @@ -288,34 +325,8 @@ where }) .collect(); - let task = server_select(sub_stream, proto_metas) - .and_then(|(handle, name, version)| { - match version { - Some(version) => { - let send_task = event_sender.send(ProtocolEvent::Open { - sub_stream: Box::new(handle), - proto_name: name, - version, - }); - - tokio::spawn(send_task.map(|_| ()).map_err(|err| { - error!("stream send back error: {:?}", err); - })); - } - None => { - // server close the connect - let _ = handle.into_inner().shutdown(); - debug!("negotiation to open the protocol [{}] failed", name); - } - } - Ok(()) - }) - .timeout(self.timeout) - .map_err(|err| { - trace!("stream protocol select err: {:?}", err); - }); - - tokio::spawn(task); + let task = server_select(sub_stream, proto_metas); + self.select_procedure(task); } /// Handling events uploaded by the protocol stream @@ -384,6 +395,12 @@ where data, }) } + ProtocolEvent::SelectError { proto_name } => { + self.event_output(SessionEvent::ProtocolSelectError { + id: self.id, + proto_name, + }) + } ProtocolEvent::Error { proto_id, error, .. } => { diff --git a/src/substream.rs b/src/substream.rs index f9671817..998162ba 100644 --- a/src/substream.rs +++ b/src/substream.rs @@ -44,6 +44,9 @@ pub(crate) enum ProtocolEvent { /// Data data: bytes::Bytes, }, + SelectError { + proto_name: Option, + }, /// Codec error Error { /// Stream id From 23930a81863cdfc620517d208c46272e1751c7f9 Mon Sep 17 00:00:00 2001 From: piaoliu <441594700@qq.com> Date: Thu, 28 Feb 2019 14:33:10 +0800 Subject: [PATCH 2/2] Set timeout check to session --- src/session.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/session.rs b/src/session.rs index 39f20319..cfcc1fd4 100644 --- a/src/session.rs +++ b/src/session.rs @@ -6,9 +6,15 @@ use futures::{ use log::{debug, error, trace, warn}; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; -use std::{error, io, time::Duration}; -use tokio::codec::{Decoder, Encoder, Framed, FramedParts, LengthDelimitedCodec}; +use std::{ + error, io, + time::{Duration, Instant}, +}; use tokio::prelude::{AsyncRead, AsyncWrite, FutureExt}; +use tokio::{ + codec::{Decoder, Encoder, Framed, FramedParts, LengthDelimitedCodec}, + timer::Delay, +}; use crate::{ error::Error, @@ -120,6 +126,7 @@ pub(crate) struct Session { id: SessionId, timeout: Duration, + timeout_check: Option, dead: bool, @@ -172,6 +179,7 @@ where protocol_configs: meta.protocol_configs, id: meta.id, timeout: meta.timeout, + timeout_check: Some(Delay::new(Instant::now() + meta.timeout)), ty: meta.ty, next_stream: 0, sub_streams: HashMap::default(), @@ -494,6 +502,18 @@ where self.flush(); } + if let Some(mut check) = self.timeout_check.take() { + match check.poll() { + Ok(Async::Ready(_)) => { + if self.sub_streams.is_empty() { + self.dead = true; + } + } + Ok(Async::NotReady) => self.timeout_check = Some(check), + Err(e) => debug!("timeout check error: {}", e), + } + } + loop { match self.socket.poll() { Ok(Async::Ready(Some(sub_stream))) => self.handle_sub_stream(sub_stream),