Skip to content

Commit

Permalink
Merge pull request #105 from nervosnetwork/add-close-command
Browse files Browse the repository at this point in the history
feat: support gracefully shutting down the service...
  • Loading branch information
driftluo authored Apr 13, 2019
2 parents 90c8c01 + b71fbaa commit de913e8
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 17 deletions.
14 changes: 13 additions & 1 deletion secio/src/handshake/handshake_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::peer_id::PeerId;
use flatbuffers::FlatBufferBuilder;
use flatbuffers_verifier::get_root;

use std::fmt;

#[derive(Clone, Default, PartialEq, Ord, PartialOrd, Eq, Debug)]
pub struct Propose {
pub(crate) rand: Vec<u8>,
Expand Down Expand Up @@ -107,7 +109,7 @@ impl Exchange {
}

/// Public Key
#[derive(Clone, Debug, PartialEq, Ord, PartialOrd, Eq, Hash)]
#[derive(Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
pub enum PublicKey {
/// Secp256k1
Secp256k1(Vec<u8>),
Expand Down Expand Up @@ -153,6 +155,16 @@ impl PublicKey {
}
}

impl fmt::Debug for PublicKey {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "0x")?;
for byte in self.inner_ref() {
write!(f, "{:02x}", byte)?;
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::{Exchange, Propose, PublicKey};
Expand Down
13 changes: 13 additions & 0 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,19 @@ impl ServiceContext {
}
}

/// Shutdown service.
///
/// Order:
/// 1. close all listens
/// 2. try close all session's protocol stream
/// 3. try close all session
/// 4. close service
pub fn shutdown(&mut self) {
if self.inner.shutdown().is_err() {
warn!("Service is abnormally closed")
}
}

pub(crate) fn clone_self(&self) -> Self {
ServiceContext {
inner: self.inner.clone(),
Expand Down
48 changes: 32 additions & 16 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
protocol_select::ProtocolInfo,
secio::{handshake::Config, PublicKey, SecioKeyPair},
service::{
config::ServiceConfig,
config::{ServiceConfig, State},
event::ServiceTask,
future_task::{BoxedFutureTask, FutureTaskManager},
},
Expand Down Expand Up @@ -66,9 +66,8 @@ pub struct Service<T> {

dial_protocols: HashMap<Multiaddr, DialProtocol>,
config: ServiceConfig,
/// Calculate the number of connection requests that need to be sent externally,
/// if run forever, it will default to 1, else it default to 0
task_count: usize,
/// service state
state: State,

next_session: SessionId,

Expand Down Expand Up @@ -153,7 +152,7 @@ where
listens: Vec::new(),
dial_protocols: HashMap::default(),
config,
task_count: if forever { 1 } else { 0 },
state: State::new(forever),
next_session: 0,
write_buf: VecDeque::default(),
read_service_buf: VecDeque::default(),
Expand Down Expand Up @@ -228,7 +227,7 @@ where
self.pending_tasks.push_back(ServiceTask::FutureTask {
task: Box::new(task),
});
self.task_count += 1;
self.state.increase();
Ok(listen_addr)
}

Expand Down Expand Up @@ -284,7 +283,7 @@ where
self.pending_tasks.push_back(ServiceTask::FutureTask {
task: Box::new(task),
});
self.task_count += 1;
self.state.increase();
Ok(())
}

Expand Down Expand Up @@ -693,7 +692,7 @@ where
H: AsyncRead + AsyncWrite + Send + 'static,
{
if ty.is_outbound() {
self.task_count -= 1;
self.state.decrease();
}
let target = self
.dial_protocols
Expand Down Expand Up @@ -1118,7 +1117,7 @@ where
}
SessionEvent::HandshakeFail { ty, error, address } => {
if ty.is_outbound() {
self.task_count -= 1;
self.state.decrease();
self.dial_protocols.remove(&address);
self.handle.handle_error(
&mut self.service_context,
Expand Down Expand Up @@ -1161,15 +1160,15 @@ where
},
),
SessionEvent::DialError { address, error } => {
self.task_count -= 1;
self.state.decrease();
self.dial_protocols.remove(&address);
self.handle.handle_error(
&mut self.service_context,
ServiceError::DialerError { address, error },
)
}
SessionEvent::ListenError { address, error } => {
self.task_count -= 1;
self.state.decrease();
self.handle.handle_error(
&mut self.service_context,
ServiceError::ListenError { address, error },
Expand Down Expand Up @@ -1207,7 +1206,7 @@ where
},
);
self.listens.push((listen_address, incoming));
self.task_count -= 1;
self.state.decrease();
self.update_listens();
self.listen_poll();
}
Expand Down Expand Up @@ -1430,6 +1429,23 @@ where
session_id,
proto_id,
} => self.protocol_close(session_id, proto_id, Source::External),
ServiceTask::Shutdown => {
self.sessions
.keys()
.cloned()
.collect::<Vec<SessionId>>()
.into_iter()
.for_each(|i| self.session_close(i, Source::External));
self.state.pre_shutdown();
while let Some((address, incoming)) = self.listens.pop() {
drop(incoming);
self.handle.handle_event(
&mut self.service_context,
ServiceEvent::ListenClose { address },
)
}
self.pending_tasks.clear();
}
}
}

Expand Down Expand Up @@ -1492,7 +1508,7 @@ where

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.listens.is_empty()
&& self.task_count == 0
&& self.state.is_shutdown()
&& self.sessions.is_empty()
&& self.pending_tasks.is_empty()
{
Expand Down Expand Up @@ -1542,16 +1558,16 @@ where

// Double check service state
if self.listens.is_empty()
&& self.task_count == 0
&& self.state.is_shutdown()
&& self.sessions.is_empty()
&& self.pending_tasks.is_empty()
{
return Ok(Async::Ready(None));
}
debug!(
"listens count: {}, task_count: {}, sessions count: {}, pending task: {}",
"listens count: {}, state: {:?}, sessions count: {}, pending task: {}",
self.listens.len(),
self.task_count,
self.state,
self.sessions.len(),
self.pending_tasks.len(),
);
Expand Down
93 changes: 93 additions & 0 deletions src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,96 @@ impl<T> ProtocolHandle<T> {
self.is_event() || self.is_both()
}
}

/// Service state
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub enum State {
/// Calculate the number of connection requests that need to be sent externally
Running(usize),
Forever,
PreShutdown,
}

impl State {
/// new
pub fn new(forever: bool) -> Self {
if forever {
State::Forever
} else {
State::Running(0)
}
}

/// Can it be shutdown?
#[inline]
pub fn is_shutdown(&self) -> bool {
match self {
State::Running(num) if num == &0 => true,
State::PreShutdown => true,
State::Running(_) | State::Forever => false,
}
}

/// Convert to pre shutdown state
#[inline]
pub fn pre_shutdown(&mut self) {
*self = State::PreShutdown
}

/// Add one task count
#[inline]
pub fn increase(&mut self) {
match self {
State::Running(num) => *num += 1,
State::PreShutdown | State::Forever => (),
}
}

/// Reduce one task count
#[inline]
pub fn decrease(&mut self) {
match self {
State::Running(num) => *num -= 1,
State::PreShutdown | State::Forever => (),
}
}
}

#[cfg(test)]
mod test {
use super::State;

#[test]
fn test_state_no_forever() {
let mut state = State::new(false);
state.increase();
state.increase();
assert_eq!(state, State::Running(2));
state.decrease();
state.decrease();
assert_eq!(state, State::Running(0));
state.increase();
state.increase();
state.increase();
state.increase();
state.pre_shutdown();
assert_eq!(state, State::PreShutdown);
}

#[test]
fn test_state_forever() {
let mut state = State::new(true);
state.increase();
state.increase();
assert_eq!(state, State::Forever);
state.decrease();
state.decrease();
assert_eq!(state, State::Forever);
state.increase();
state.increase();
state.increase();
state.increase();
state.pre_shutdown();
assert_eq!(state, State::PreShutdown);
}
}
11 changes: 11 additions & 0 deletions src/service/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,15 @@ impl ServiceControl {
token,
})
}

/// Shutdown service
///
/// Order:
/// 1. close all listens
/// 2. try close all session's protocol stream
/// 3. try close all session
/// 4. close service
pub fn shutdown(&mut self) -> Result<(), Error> {
self.send(ServiceTask::Shutdown)
}
}
3 changes: 3 additions & 0 deletions src/service/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ pub(crate) enum ServiceTask {
/// Listen address
address: Multiaddr,
},
/// Shutdown service
Shutdown,
}

impl fmt::Debug for ServiceTask {
Expand Down Expand Up @@ -305,6 +307,7 @@ impl fmt::Debug for ServiceTask {
session_id,
proto_id,
} => write!(f, "Close session [{}] proto [{}]", session_id, proto_id),
Shutdown => write!(f, "Try close service"),
}
}
}
Loading

0 comments on commit de913e8

Please sign in to comment.