Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support gracefully shutting down the service... #105

Merged
merged 2 commits into from
Apr 13, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion secio/src/handshake/handshake_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::peer_id::PeerId;
use flatbuffers::FlatBufferBuilder;
use flatbuffers_verifier::get_root;

use std::fmt;

#[derive(Clone, Default, PartialEq, Ord, PartialOrd, Eq, Debug)]
pub struct Propose {
pub(crate) rand: Vec<u8>,
Expand Down Expand Up @@ -107,7 +109,7 @@ impl Exchange {
}

/// Public Key
#[derive(Clone, Debug, PartialEq, Ord, PartialOrd, Eq, Hash)]
#[derive(Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
pub enum PublicKey {
/// Secp256k1
Secp256k1(Vec<u8>),
Expand Down Expand Up @@ -153,6 +155,16 @@ impl PublicKey {
}
}

impl fmt::Debug for PublicKey {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "0x")?;
for byte in self.inner_ref() {
write!(f, "{:02x}", byte)?;
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::{Exchange, Propose, PublicKey};
Expand Down
13 changes: 13 additions & 0 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,19 @@ impl ServiceContext {
}
}

/// Shutdown service.
///
/// Order:
/// 1. close all listens
/// 2. try close all session's protocol stream
/// 3. try close all session
/// 4. close service
pub fn shutdown(&mut self) {
if self.inner.shutdown().is_err() {
warn!("Service is abnormally closed")
}
}

pub(crate) fn clone_self(&self) -> Self {
ServiceContext {
inner: self.inner.clone(),
Expand Down
44 changes: 28 additions & 16 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
protocol_select::ProtocolInfo,
secio::{handshake::Config, PublicKey, SecioKeyPair},
service::{
config::ServiceConfig,
config::{ServiceConfig, State},
event::ServiceTask,
future_task::{BoxedFutureTask, FutureTaskManager},
},
Expand Down Expand Up @@ -66,9 +66,8 @@ pub struct Service<T> {

dial_protocols: HashMap<Multiaddr, DialProtocol>,
config: ServiceConfig,
/// Calculate the number of connection requests that need to be sent externally,
/// if run forever, it will default to 1, else it default to 0
task_count: usize,
/// service state
state: State,

next_session: SessionId,

Expand Down Expand Up @@ -153,7 +152,7 @@ where
listens: Vec::new(),
dial_protocols: HashMap::default(),
config,
task_count: if forever { 1 } else { 0 },
state: State::new(forever),
next_session: 0,
write_buf: VecDeque::default(),
read_service_buf: VecDeque::default(),
Expand Down Expand Up @@ -228,7 +227,7 @@ where
self.pending_tasks.push_back(ServiceTask::FutureTask {
task: Box::new(task),
});
self.task_count += 1;
self.state.add();
Ok(listen_addr)
}

Expand Down Expand Up @@ -284,7 +283,7 @@ where
self.pending_tasks.push_back(ServiceTask::FutureTask {
task: Box::new(task),
});
self.task_count += 1;
self.state.add();
Ok(())
}

Expand Down Expand Up @@ -693,7 +692,7 @@ where
H: AsyncRead + AsyncWrite + Send + 'static,
{
if ty.is_outbound() {
self.task_count -= 1;
self.state.minus();
}
let target = self
.dial_protocols
Expand Down Expand Up @@ -1118,7 +1117,7 @@ where
}
SessionEvent::HandshakeFail { ty, error, address } => {
if ty.is_outbound() {
self.task_count -= 1;
self.state.minus();
self.dial_protocols.remove(&address);
self.handle.handle_error(
&mut self.service_context,
Expand Down Expand Up @@ -1161,15 +1160,15 @@ where
},
),
SessionEvent::DialError { address, error } => {
self.task_count -= 1;
self.state.minus();
self.dial_protocols.remove(&address);
self.handle.handle_error(
&mut self.service_context,
ServiceError::DialerError { address, error },
)
}
SessionEvent::ListenError { address, error } => {
self.task_count -= 1;
self.state.minus();
self.handle.handle_error(
&mut self.service_context,
ServiceError::ListenError { address, error },
Expand Down Expand Up @@ -1207,7 +1206,7 @@ where
},
);
self.listens.push((listen_address, incoming));
self.task_count -= 1;
self.state.minus();
self.update_listens();
self.listen_poll();
}
Expand Down Expand Up @@ -1430,6 +1429,19 @@ where
session_id,
proto_id,
} => self.protocol_close(session_id, proto_id, Source::External),
ServiceTask::Shutdown => {
let ids = self.sessions.keys().cloned().collect::<Vec<SessionId>>();
ids.into_iter()
.for_each(|i| self.session_close(i, Source::External));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.sessions
    .keys()
    .cloned()
    .for_each(|i| self.session_close(i, Source::External));

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Borrow check is attacking 😭

self.state.pre_shutdown();
while let Some((address, _)) = self.listens.pop() {
self.handle.handle_event(
&mut self.service_context,
ServiceEvent::ListenClose { address },
)
}
self.pending_tasks.clear();
}
}
}

Expand Down Expand Up @@ -1492,7 +1504,7 @@ where

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.listens.is_empty()
&& self.task_count == 0
&& self.state.is_shutdown()
&& self.sessions.is_empty()
&& self.pending_tasks.is_empty()
{
Expand Down Expand Up @@ -1542,16 +1554,16 @@ where

// Double check service state
if self.listens.is_empty()
&& self.task_count == 0
&& self.state.is_shutdown()
&& self.sessions.is_empty()
&& self.pending_tasks.is_empty()
{
return Ok(Async::Ready(None));
}
debug!(
"listens count: {}, task_count: {}, sessions count: {}, pending task: {}",
"listens count: {}, state: {:?}, sessions count: {}, pending task: {}",
self.listens.len(),
self.task_count,
self.state,
self.sessions.len(),
self.pending_tasks.len(),
);
Expand Down
78 changes: 78 additions & 0 deletions src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,81 @@ impl<T> ProtocolHandle<T> {
self.is_event() || self.is_both()
}
}

/// Service state
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub enum State {
/// Calculate the number of connection requests that need to be sent externally,
/// if run forever, it will default to 1, else it default to 0
Running(usize),
PreShutdown,
}

impl State {
/// new
pub fn new(forever: bool) -> Self {
if forever {
State::Running(1)
} else {
State::Running(0)
}
}

/// Can it be shutdown?
#[inline]
pub fn is_shutdown(&self) -> bool {
match self {
State::Running(num) if num == &0 => true,
State::PreShutdown => true,
State::Running(_) => false,
}
}

/// Convert to pre shutdown state
#[inline]
pub fn pre_shutdown(&mut self) {
*self = State::PreShutdown
}

/// Add one task count
#[inline]
pub fn add(&mut self) {
match self {
State::Running(num) => *num += 1,
State::PreShutdown => (),
}
}

/// Reduce one task count
#[inline]
pub fn minus(&mut self) {
match self {
State::Running(num) => *num -= 1,
State::PreShutdown => (),
}
}
}

#[cfg(test)]
mod test {
use super::State;

#[test]
fn test_state() {
let mut state = State::new(true);
state.add();
state.add();
assert_eq!(state, State::Running(3));
state.minus();
state.minus();
assert_eq!(state, State::Running(1));
state.minus();
assert_eq!(state, State::Running(0));
state.add();
state.add();
state.add();
state.add();
state.pre_shutdown();
assert_eq!(state, State::PreShutdown);
}
}
11 changes: 11 additions & 0 deletions src/service/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,15 @@ impl ServiceControl {
token,
})
}

/// Shutdown service
///
/// Order:
/// 1. close all listens
/// 2. try close all session's protocol stream
/// 3. try close all session
/// 4. close service
pub fn shutdown(&mut self) -> Result<(), Error> {
self.send(ServiceTask::Shutdown)
}
}
3 changes: 3 additions & 0 deletions src/service/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ pub(crate) enum ServiceTask {
/// Listen address
address: Multiaddr,
},
/// Shutdown service
Shutdown,
}

impl fmt::Debug for ServiceTask {
Expand Down Expand Up @@ -305,6 +307,7 @@ impl fmt::Debug for ServiceTask {
session_id,
proto_id,
} => write!(f, "Close session [{}] proto [{}]", session_id, proto_id),
Shutdown => write!(f, "Try close service"),
}
}
}
Loading