From 996f69891c9b4d24a06cc62967f8dbce68d8dabc Mon Sep 17 00:00:00 2001 From: piaoliu <441594700@qq.com> Date: Mon, 4 Mar 2019 11:34:58 +0800 Subject: [PATCH] Report session timeout --- src/service.rs | 161 ++++--------------------------------------- src/service/event.rs | 156 +++++++++++++++++++++++++++++++++++++++++ src/session.rs | 5 ++ 3 files changed, 176 insertions(+), 146 deletions(-) create mode 100644 src/service/event.rs diff --git a/src/service.rs b/src/service.rs index c8fbdcae..b2f5b7df 100644 --- a/src/service.rs +++ b/src/service.rs @@ -8,7 +8,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::{ error::{self, Error as ErrorTrait}, - fmt, io, + io, time::Duration, }; use tokio::net::{ @@ -38,8 +38,12 @@ use crate::{ }; mod control; +mod event; -pub use crate::service::control::ServiceControl; +pub use crate::service::{ + control::ServiceControl, + event::{ServiceError, ServiceEvent, ServiceTask}, +}; /// Protocol handle value pub(crate) enum ProtocolHandle { @@ -49,149 +53,6 @@ pub(crate) enum ProtocolHandle { Session(Box), } -/// Error generated by the Service -#[derive(Debug)] -pub enum ServiceError { - /// When dial remote error - DialerError { - /// Remote address - address: Multiaddr, - /// error - error: Error, - }, - /// When listen error - ListenError { - /// Listen address - address: Multiaddr, - /// error - error: Error, - }, - /// Protocol select fail - ProtocolSelectError { - /// Protocol name, if none, timeout or other net problem, - /// if Some, don't support this proto - proto_name: Option, - /// Session context - session_context: SessionContext, - }, - /// Protocol error during interaction - ProtocolError { - /// Session id - id: SessionId, - /// Protocol id - proto_id: ProtocolId, - /// Codec error - error: Error, - }, -} - -/// Event generated by the Service -#[derive(Debug)] -pub enum ServiceEvent { - /// A session close - SessionClose { - /// Session id - id: SessionId, - }, - /// A session open - SessionOpen { - /// Session id - id: SessionId, - /// Remote address - address: Multiaddr, - /// Outbound or Inbound - ty: SessionType, - /// Remote public key - public_key: Option, - }, -} - -/// Task received by the Service. -/// -/// An instruction that the outside world can send to the service -pub enum ServiceTask { - /// Send protocol data task - ProtocolMessage { - /// Specify which sessions to send to, - /// None means broadcast - session_ids: Option>, - /// protocol id - proto_id: ProtocolId, - /// data - data: Vec, - }, - /// Service-level notify task - ProtocolNotify { - /// Protocol id - proto_id: ProtocolId, - /// Notify token - token: u64, - }, - /// Session-level notify task - ProtocolSessionNotify { - /// Session id - session_id: SessionId, - /// Protocol id - proto_id: ProtocolId, - /// Notify token - token: u64, - }, - /// Future task - FutureTask { - /// Future - task: Box + 'static + Send>, - }, - /// Disconnect task - Disconnect { - /// Session id - session_id: SessionId, - }, - /// Dial task - Dial { - /// Remote address - address: Multiaddr, - }, - /// Listen task - Listen { - /// Listen address - address: Multiaddr, - }, -} - -impl fmt::Debug for ServiceTask { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - use self::ServiceTask::*; - - match self { - ProtocolMessage { - session_ids, - proto_id, - data, - } => write!( - f, - "id: {:?}, protoid: {}, message: {:?}", - session_ids, proto_id, data - ), - ProtocolNotify { proto_id, token } => { - write!(f, "protocol id: {}, token: {}", proto_id, token) - } - ProtocolSessionNotify { - session_id, - proto_id, - token, - } => write!( - f, - "session id: {}, protocol id: {}, token: {}", - session_id, proto_id, token - ), - FutureTask { .. } => write!(f, "Future task"), - Disconnect { session_id } => write!(f, "Disconnect session [{}]", session_id), - Dial { address } => write!(f, "Dial address: {}", address), - Listen { address } => write!(f, "Listen address: {}", address), - } - } -} - /// An abstraction of p2p service, currently only supports TCP protocol pub struct Service { protocol_configs: Arc + Send + Sync>>>, @@ -1031,7 +892,7 @@ where } => self.protocol_open(id, proto_id, version), SessionEvent::ProtocolClose { id, proto_id, .. } => self.protocol_close(id, proto_id), SessionEvent::ProtocolSelectError { id, proto_name } => { - if let Some(session_context) = self.sessions.get(&id).cloned() { + if let Some(session_context) = self.sessions.get(&id) { self.handle.handle_error( &mut self.service_context, ServiceError::ProtocolSelectError { @@ -1061,6 +922,14 @@ where &mut self.service_context, ServiceError::ListenError { address, error }, ), + SessionEvent::SessionTimeout { id } => { + if let Some(session_context) = self.sessions.get(&id) { + self.handle.handle_error( + &mut self.service_context, + ServiceError::SessionTimeout { session_context }, + ) + } + } SessionEvent::DNSResolverSuccess { ty, address } => { self.task_count -= 1; match ty { diff --git a/src/service/event.rs b/src/service/event.rs new file mode 100644 index 00000000..d3349844 --- /dev/null +++ b/src/service/event.rs @@ -0,0 +1,156 @@ +use futures::Future; +use std::fmt; + +use crate::{ + context::SessionContext, error::Error, multiaddr::Multiaddr, secio::PublicKey, + yamux::session::SessionType, ProtocolId, SessionId, +}; + +/// Error generated by the Service +#[derive(Debug)] +pub enum ServiceError<'a> { + /// When dial remote error + DialerError { + /// Remote address + address: Multiaddr, + /// error + error: Error, + }, + /// When listen error + ListenError { + /// Listen address + address: Multiaddr, + /// error + error: Error, + }, + /// Protocol select fail + ProtocolSelectError { + /// Protocol name, if none, timeout or other net problem, + /// if Some, don't support this proto + proto_name: Option, + /// Session context + session_context: &'a SessionContext, + }, + /// Protocol error during interaction + ProtocolError { + /// Session id + id: SessionId, + /// Protocol id + proto_id: ProtocolId, + /// Codec error + error: Error, + }, + /// After initializing the connection, the session does not open any protocol, + /// suspected fd attack + SessionTimeout { + /// Session context + session_context: &'a SessionContext, + }, +} + +/// Event generated by the Service +#[derive(Debug)] +pub enum ServiceEvent { + /// A session close + SessionClose { + /// Session id + id: SessionId, + }, + /// A session open + SessionOpen { + /// Session id + id: SessionId, + /// Remote address + address: Multiaddr, + /// Outbound or Inbound + ty: SessionType, + /// Remote public key + public_key: Option, + }, +} + +/// Task received by the Service. +/// +/// An instruction that the outside world can send to the service +pub enum ServiceTask { + /// Send protocol data task + ProtocolMessage { + /// Specify which sessions to send to, + /// None means broadcast + session_ids: Option>, + /// protocol id + proto_id: ProtocolId, + /// data + data: Vec, + }, + /// Service-level notify task + ProtocolNotify { + /// Protocol id + proto_id: ProtocolId, + /// Notify token + token: u64, + }, + /// Session-level notify task + ProtocolSessionNotify { + /// Session id + session_id: SessionId, + /// Protocol id + proto_id: ProtocolId, + /// Notify token + token: u64, + }, + /// Future task + FutureTask { + /// Future + task: Box + 'static + Send>, + }, + /// Disconnect task + Disconnect { + /// Session id + session_id: SessionId, + }, + /// Dial task + Dial { + /// Remote address + address: Multiaddr, + }, + /// Listen task + Listen { + /// Listen address + address: Multiaddr, + }, +} + +impl fmt::Debug for ServiceTask { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use self::ServiceTask::*; + + match self { + ProtocolMessage { + session_ids, + proto_id, + data, + } => write!( + f, + "id: {:?}, protoid: {}, message: {:?}", + session_ids, proto_id, data + ), + ProtocolNotify { proto_id, token } => { + write!(f, "protocol id: {}, token: {}", proto_id, token) + } + ProtocolSessionNotify { + session_id, + proto_id, + token, + } => write!( + f, + "session id: {}, protocol id: {}, token: {}", + session_id, proto_id, token + ), + FutureTask { .. } => write!(f, "Future task"), + Disconnect { session_id } => write!(f, "Disconnect session [{}]", session_id), + Dial { address } => write!(f, "Dial address: {}", address), + Listen { address } => write!(f, "Listen address: {}", address), + } + } +} diff --git a/src/session.rs b/src/session.rs index cfcc1fd4..8d352e8c 100644 --- a/src/session.rs +++ b/src/session.rs @@ -107,6 +107,10 @@ pub(crate) enum SessionEvent { /// proto_name proto_name: Option, }, + SessionTimeout { + /// Session id + id: SessionId, + }, /// Codec error ProtocolError { /// Session id @@ -506,6 +510,7 @@ where match check.poll() { Ok(Async::Ready(_)) => { if self.sub_streams.is_empty() { + self.event_output(SessionEvent::SessionTimeout { id: self.id }); self.dead = true; } }