From 6dd800254e3a7e6ec09d239171cf582eca657b2a Mon Sep 17 00:00:00 2001 From: piaoliu <441594700@qq.com> Date: Mon, 1 Apr 2019 22:23:21 +0800 Subject: [PATCH] Fix close --- src/service.rs | 4 ++-- src/session.rs | 21 +++++++++++++-------- src/substream.rs | 16 +++++++++++----- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/src/service.rs b/src/service.rs index 86a40e7e..7780d4b5 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1152,7 +1152,7 @@ where Err(timer::Error::shutdown()) } }) - .map_err(|err| warn!("{}", err)); + .map_err(|err| debug!("notify close by: {}", err)); // If set more than once, the older task will stop when sender dropped self.service_notify_signals @@ -1203,7 +1203,7 @@ where Err(timer::Error::shutdown()) } }) - .map_err(|err| warn!("{}", err)); + .map_err(|err| debug!("session notify close by: {}", err)); // If set more than once, the older task will stop when sender dropped if let Some(session) = self.sessions.get_mut(&session_id) { diff --git a/src/session.rs b/src/session.rs index 60d75f1d..56d885f5 100644 --- a/src/session.rs +++ b/src/session.rs @@ -223,7 +223,7 @@ where version, }); tokio::spawn(send_task.map(|_| ()).map_err(|err| { - error!("stream send back error: {:?}", err); + debug!("stream send back error: {:?}", err); })); } None => { @@ -232,7 +232,7 @@ where proto_name: Some(name), }); tokio::spawn(send_task.map(|_| ()).map_err(|err| { - error!("select error send back error: {:?}", err); + debug!("select error send back error: {:?}", err); })); } }, @@ -241,7 +241,7 @@ where let send_task = event_sender.send(ProtocolEvent::SelectError { proto_name: None }); tokio::spawn(send_task.map(|_| ()).map_err(|err| { - error!("select error send back error: {:?}", err); + debug!("select error send back error: {:?}", err); })); } } @@ -298,7 +298,7 @@ where self.write_buf.push_back(e.into_inner()); self.notify(); } else { - error!("session send to sub stream error: {}", e); + debug!("session send to sub stream error: {}", e); } } }; @@ -310,7 +310,7 @@ where self.write_buf.push_back(e.into_inner()); self.notify(); } else { - error!("session send to sub stream error: {}", e); + debug!("session send to sub stream error: {}", e); } } }; @@ -483,9 +483,14 @@ where /// Close session fn close_session(&mut self) { - let _ = self - .service_sender - .try_send(SessionEvent::SessionClose { id: self.id }); + tokio::spawn( + self.service_sender + .clone() + .send(SessionEvent::SessionClose { id: self.id }) + .map(|_| ()) + .map_err(|e| error!("session close event send to service error: {:?}", e)), + ); + self.sub_streams.clear(); self.service_receiver.close(); self.proto_event_receiver.close(); diff --git a/src/substream.rs b/src/substream.rs index 5f1ee6f5..fe372243 100644 --- a/src/substream.rs +++ b/src/substream.rs @@ -135,10 +135,16 @@ where fn close_proto_stream(&mut self) { self.event_receiver.close(); let _ = self.sub_stream.get_mut().shutdown(); - self.output_event(ProtocolEvent::Close { - id: self.id, - proto_id: self.proto_id, - }); + tokio::spawn( + self.event_sender + .clone() + .send(ProtocolEvent::Close { + id: self.id, + proto_id: self.proto_id, + }) + .map(|_| ()) + .map_err(|e| debug!("stream close event send to session error: {:?}", e)), + ); } /// Handling commands send by session @@ -152,7 +158,7 @@ where // Whether it is a read send error or a flush error, // the most essential problem is that there is a problem with the external network. // Close the protocol stream directly. - warn!( + debug!( "protocol [{}] close because of extern network", self.proto_id );