-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: fix some msg left on buffer #288
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,7 @@ use std::{ | |
use timer::Instant; | ||
|
||
use futures::{ | ||
channel::mpsc::{channel, Receiver, Sender}, | ||
channel::mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender}, | ||
Sink, Stream, | ||
}; | ||
use log::debug; | ||
|
@@ -82,6 +82,9 @@ pub struct Session<T> { | |
event_sender: Sender<StreamEvent>, | ||
// For receive events from sub streams | ||
event_receiver: Receiver<StreamEvent>, | ||
// The control information must be sent successfully and will not cause memory problems | ||
unbound_event_sender: UnboundedSender<StreamEvent>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This channel is only used to send control information(window update and close), there is no possibility of unlimited increase |
||
unbound_event_receiver: UnboundedReceiver<StreamEvent>, | ||
|
||
/// use to async open stream/close session | ||
control_sender: Sender<Command>, | ||
|
@@ -122,6 +125,7 @@ where | |
SessionType::Server => 2, | ||
}; | ||
let (event_sender, event_receiver) = channel(32); | ||
let (unbound_event_sender, unbound_event_receiver) = unbounded(); | ||
let (control_sender, control_receiver) = channel(32); | ||
let framed_stream = Framed::new( | ||
raw_stream, | ||
|
@@ -149,6 +153,8 @@ where | |
read_pending_frames: VecDeque::default(), | ||
event_sender, | ||
event_receiver, | ||
unbound_event_sender, | ||
unbound_event_receiver, | ||
control_sender, | ||
control_receiver, | ||
keepalive, | ||
|
@@ -280,12 +286,13 @@ where | |
let mut stream = StreamHandle::new( | ||
stream_id, | ||
self.event_sender.clone(), | ||
self.unbound_event_sender.clone(), | ||
frame_receiver, | ||
state, | ||
self.config.max_stream_window_size, | ||
self.config.max_stream_window_size, | ||
); | ||
if let Err(err) = stream.send_window_update(None) { | ||
if let Err(err) = stream.send_window_update() { | ||
debug!("[{:?}] stream.send_window_update error={:?}", self.ty, err); | ||
} | ||
Ok(stream) | ||
|
@@ -516,11 +523,6 @@ where | |
self.streams.remove(&stream_id); | ||
} | ||
} | ||
StreamEvent::Flush(stream_id) => { | ||
debug!("[{}] session flushing.....", stream_id); | ||
self.flush(cx)?; | ||
debug!("[{}] session flushed", stream_id); | ||
} | ||
StreamEvent::GoAway => self.send_go_away_with_code(cx, GoAwayCode::ProtocolError)?, | ||
} | ||
Ok(()) | ||
|
@@ -543,6 +545,24 @@ where | |
} | ||
} | ||
|
||
fn recv_unbound_events(&mut self, cx: &mut Context) -> Poll<Option<Result<(), io::Error>>> { | ||
match Pin::new(&mut self.unbound_event_receiver) | ||
.as_mut() | ||
.poll_next(cx) | ||
{ | ||
Poll::Ready(Some(event)) => { | ||
self.handle_event(cx, event)?; | ||
Poll::Ready(Some(Ok(()))) | ||
} | ||
Poll::Ready(None) => { | ||
// Since session hold one event sender, | ||
// the channel can not be disconnected. | ||
unreachable!() | ||
} | ||
Poll::Pending => Poll::Pending, | ||
} | ||
} | ||
|
||
fn control_poll(&mut self, cx: &mut Context) -> Poll<Option<Result<(), io::Error>>> { | ||
match Pin::new(&mut self.control_receiver).as_mut().poll_next(cx) { | ||
Poll::Ready(Some(event)) => { | ||
|
@@ -609,6 +629,7 @@ where | |
|
||
let mut is_pending = self.control_poll(cx)?.is_pending(); | ||
is_pending &= self.recv_frames(cx)?.is_pending(); | ||
is_pending &= self.recv_unbound_events(cx)?.is_pending(); | ||
is_pending &= self.recv_events(cx)?.is_pending(); | ||
if is_pending { | ||
break; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No difference, may look better