Skip to content

Commit

Permalink
Merge pull request #57 from nervosnetwork/report-session-timeout
Browse files Browse the repository at this point in the history
Report session timeout
  • Loading branch information
TheWaWaR authored Mar 4, 2019
2 parents 562adc4 + 996f698 commit cdc8343
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 146 deletions.
161 changes: 15 additions & 146 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::{
error::{self, Error as ErrorTrait},
fmt, io,
io,
time::Duration,
};
use tokio::net::{
Expand Down Expand Up @@ -38,8 +38,12 @@ use crate::{
};

mod control;
mod event;

pub use crate::service::control::ServiceControl;
pub use crate::service::{
control::ServiceControl,
event::{ServiceError, ServiceEvent, ServiceTask},
};

/// Protocol handle value
pub(crate) enum ProtocolHandle {
Expand All @@ -49,149 +53,6 @@ pub(crate) enum ProtocolHandle {
Session(Box<dyn SessionProtocol + Send + 'static>),
}

/// Error generated by the Service
#[derive(Debug)]
pub enum ServiceError {
/// When dial remote error
DialerError {
/// Remote address
address: Multiaddr,
/// error
error: Error<ServiceTask>,
},
/// When listen error
ListenError {
/// Listen address
address: Multiaddr,
/// error
error: Error<ServiceTask>,
},
/// Protocol select fail
ProtocolSelectError {
/// Protocol name, if none, timeout or other net problem,
/// if Some, don't support this proto
proto_name: Option<String>,
/// Session context
session_context: SessionContext,
},
/// Protocol error during interaction
ProtocolError {
/// Session id
id: SessionId,
/// Protocol id
proto_id: ProtocolId,
/// Codec error
error: Error<ServiceTask>,
},
}

/// Event generated by the Service
#[derive(Debug)]
pub enum ServiceEvent {
/// A session close
SessionClose {
/// Session id
id: SessionId,
},
/// A session open
SessionOpen {
/// Session id
id: SessionId,
/// Remote address
address: Multiaddr,
/// Outbound or Inbound
ty: SessionType,
/// Remote public key
public_key: Option<PublicKey>,
},
}

/// Task received by the Service.
///
/// An instruction that the outside world can send to the service
pub enum ServiceTask {
/// Send protocol data task
ProtocolMessage {
/// Specify which sessions to send to,
/// None means broadcast
session_ids: Option<Vec<SessionId>>,
/// protocol id
proto_id: ProtocolId,
/// data
data: Vec<u8>,
},
/// Service-level notify task
ProtocolNotify {
/// Protocol id
proto_id: ProtocolId,
/// Notify token
token: u64,
},
/// Session-level notify task
ProtocolSessionNotify {
/// Session id
session_id: SessionId,
/// Protocol id
proto_id: ProtocolId,
/// Notify token
token: u64,
},
/// Future task
FutureTask {
/// Future
task: Box<dyn Future<Item = (), Error = ()> + 'static + Send>,
},
/// Disconnect task
Disconnect {
/// Session id
session_id: SessionId,
},
/// Dial task
Dial {
/// Remote address
address: Multiaddr,
},
/// Listen task
Listen {
/// Listen address
address: Multiaddr,
},
}

impl fmt::Debug for ServiceTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use self::ServiceTask::*;

match self {
ProtocolMessage {
session_ids,
proto_id,
data,
} => write!(
f,
"id: {:?}, protoid: {}, message: {:?}",
session_ids, proto_id, data
),
ProtocolNotify { proto_id, token } => {
write!(f, "protocol id: {}, token: {}", proto_id, token)
}
ProtocolSessionNotify {
session_id,
proto_id,
token,
} => write!(
f,
"session id: {}, protocol id: {}, token: {}",
session_id, proto_id, token
),
FutureTask { .. } => write!(f, "Future task"),
Disconnect { session_id } => write!(f, "Disconnect session [{}]", session_id),
Dial { address } => write!(f, "Dial address: {}", address),
Listen { address } => write!(f, "Listen address: {}", address),
}
}
}

/// An abstraction of p2p service, currently only supports TCP protocol
pub struct Service<T, U> {
protocol_configs: Arc<HashMap<String, Box<dyn ProtocolMeta<U> + Send + Sync>>>,
Expand Down Expand Up @@ -1031,7 +892,7 @@ where
} => self.protocol_open(id, proto_id, version),
SessionEvent::ProtocolClose { id, proto_id, .. } => self.protocol_close(id, proto_id),
SessionEvent::ProtocolSelectError { id, proto_name } => {
if let Some(session_context) = self.sessions.get(&id).cloned() {
if let Some(session_context) = self.sessions.get(&id) {
self.handle.handle_error(
&mut self.service_context,
ServiceError::ProtocolSelectError {
Expand Down Expand Up @@ -1061,6 +922,14 @@ where
&mut self.service_context,
ServiceError::ListenError { address, error },
),
SessionEvent::SessionTimeout { id } => {
if let Some(session_context) = self.sessions.get(&id) {
self.handle.handle_error(
&mut self.service_context,
ServiceError::SessionTimeout { session_context },
)
}
}
SessionEvent::DNSResolverSuccess { ty, address } => {
self.task_count -= 1;
match ty {
Expand Down
156 changes: 156 additions & 0 deletions src/service/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use futures::Future;
use std::fmt;

use crate::{
context::SessionContext, error::Error, multiaddr::Multiaddr, secio::PublicKey,
yamux::session::SessionType, ProtocolId, SessionId,
};

/// Error generated by the Service
#[derive(Debug)]
pub enum ServiceError<'a> {
/// When dial remote error
DialerError {
/// Remote address
address: Multiaddr,
/// error
error: Error<ServiceTask>,
},
/// When listen error
ListenError {
/// Listen address
address: Multiaddr,
/// error
error: Error<ServiceTask>,
},
/// Protocol select fail
ProtocolSelectError {
/// Protocol name, if none, timeout or other net problem,
/// if Some, don't support this proto
proto_name: Option<String>,
/// Session context
session_context: &'a SessionContext,
},
/// Protocol error during interaction
ProtocolError {
/// Session id
id: SessionId,
/// Protocol id
proto_id: ProtocolId,
/// Codec error
error: Error<ServiceTask>,
},
/// After initializing the connection, the session does not open any protocol,
/// suspected fd attack
SessionTimeout {
/// Session context
session_context: &'a SessionContext,
},
}

/// Event generated by the Service
#[derive(Debug)]
pub enum ServiceEvent {
/// A session close
SessionClose {
/// Session id
id: SessionId,
},
/// A session open
SessionOpen {
/// Session id
id: SessionId,
/// Remote address
address: Multiaddr,
/// Outbound or Inbound
ty: SessionType,
/// Remote public key
public_key: Option<PublicKey>,
},
}

/// Task received by the Service.
///
/// An instruction that the outside world can send to the service
pub enum ServiceTask {
/// Send protocol data task
ProtocolMessage {
/// Specify which sessions to send to,
/// None means broadcast
session_ids: Option<Vec<SessionId>>,
/// protocol id
proto_id: ProtocolId,
/// data
data: Vec<u8>,
},
/// Service-level notify task
ProtocolNotify {
/// Protocol id
proto_id: ProtocolId,
/// Notify token
token: u64,
},
/// Session-level notify task
ProtocolSessionNotify {
/// Session id
session_id: SessionId,
/// Protocol id
proto_id: ProtocolId,
/// Notify token
token: u64,
},
/// Future task
FutureTask {
/// Future
task: Box<dyn Future<Item = (), Error = ()> + 'static + Send>,
},
/// Disconnect task
Disconnect {
/// Session id
session_id: SessionId,
},
/// Dial task
Dial {
/// Remote address
address: Multiaddr,
},
/// Listen task
Listen {
/// Listen address
address: Multiaddr,
},
}

impl fmt::Debug for ServiceTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use self::ServiceTask::*;

match self {
ProtocolMessage {
session_ids,
proto_id,
data,
} => write!(
f,
"id: {:?}, protoid: {}, message: {:?}",
session_ids, proto_id, data
),
ProtocolNotify { proto_id, token } => {
write!(f, "protocol id: {}, token: {}", proto_id, token)
}
ProtocolSessionNotify {
session_id,
proto_id,
token,
} => write!(
f,
"session id: {}, protocol id: {}, token: {}",
session_id, proto_id, token
),
FutureTask { .. } => write!(f, "Future task"),
Disconnect { session_id } => write!(f, "Disconnect session [{}]", session_id),
Dial { address } => write!(f, "Dial address: {}", address),
Listen { address } => write!(f, "Listen address: {}", address),
}
}
}
5 changes: 5 additions & 0 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ pub(crate) enum SessionEvent {
/// proto_name
proto_name: Option<String>,
},
SessionTimeout {
/// Session id
id: SessionId,
},
/// Codec error
ProtocolError {
/// Session id
Expand Down Expand Up @@ -506,6 +510,7 @@ where
match check.poll() {
Ok(Async::Ready(_)) => {
if self.sub_streams.is_empty() {
self.event_output(SessionEvent::SessionTimeout { id: self.id });
self.dead = true;
}
}
Expand Down

0 comments on commit cdc8343

Please sign in to comment.