diff --git a/src/client/mod.rs b/src/client/mod.rs index b82904973b..a779c29dee 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -372,7 +372,7 @@ macro_rules! conn_response { } None => { if let Some((key, socket)) = $scope.awaiting_slot.pop_front() { - rotor_try!($scope.register(&socket, EventSet::writable(), PollOpt::level())); + rotor_try!($scope.register(&socket, EventSet::writable() | EventSet::hup(), PollOpt::level())); rotor::Response::ok(ClientFsm::Connecting((key, socket))) } else { rotor::Response::done() @@ -460,7 +460,7 @@ where C: Connect, type Seed = (C::Key, C::Output); fn create(seed: Self::Seed, scope: &mut Scope) -> rotor::Response { - rotor_try!(scope.register(&seed.1, EventSet::writable(), PollOpt::level())); + rotor_try!(scope.register(&seed.1, EventSet::writable() | EventSet::hup(), PollOpt::level())); rotor::Response::ok(ClientFsm::Connecting(seed)) } @@ -483,11 +483,11 @@ where C: Connect, if let Some(err) = seed.1.take_socket_error().err() { debug!("error while connecting: {:?}", err); scope.pop_queue(&seed.0).map(move |mut queued| queued.handler.on_error(::Error::Io(err))); - rotor::Response::done() } else { trace!("connecting is_error, but no socket error"); - rotor::Response::ok(ClientFsm::Connecting(seed)) } + + rotor::Response::done() } else if events.is_writable() { if scope.queue.contains_key(&seed.0) { trace!("connected and writable {:?}", seed.0); @@ -541,6 +541,7 @@ where C: Connect, // Check all idle connections regardless of origin for (key, idle) in scope.idle_conns.iter_mut() { + // Pop from the front since those are lease recently used while let Some(ctrl) = idle.pop_front() { // Signal connection to close. An err here means the // socket is already dead can should be tossed. @@ -667,7 +668,9 @@ where C: Connect, let mut remove_idle = false; let mut woke_up = false; if let Some(mut idle) = scope.idle_conns.get_mut(&key) { - while let Some(ctrl) = idle.pop_front() { + // Pop from back since those are most recently used. Connections + // at the front are allowed to expire. + while let Some(ctrl) = idle.pop_back() { // err means the socket has since died if ctrl.ready(Next::write()).is_ok() { woke_up = true; diff --git a/src/http/conn.rs b/src/http/conn.rs index 3f486c52d4..05792d5c5e 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -37,6 +37,10 @@ struct ConnInner> { key: K, state: State, transport: T, + /// Records a WouldBlock error when trying to read + /// + /// This flag is used to prevent busy looping + read_would_block: bool, } impl> fmt::Debug for ConnInner { @@ -153,7 +157,10 @@ impl> ConnInner { Ok(head) => head, Err(::Error::Io(e)) => match e.kind() { io::ErrorKind::WouldBlock | - io::ErrorKind::Interrupted => return state, + io::ErrorKind::Interrupted => { + self.read_would_block = true; + return state; + }, _ => { debug!("io error trying to parse {:?}", e); return State::Closed; @@ -236,6 +243,30 @@ impl> ConnInner { } } }, + State::Init { interest: Next_::Wait, .. } => { + match self.buf.read_from(&mut self.transport) { + Ok(0) => { + // End-of-file; connection was closed by peer + State::Closed + }, + Ok(n) => { + // Didn't expect bytes here! Close the connection. + warn!("read {} bytes in State::Init with Wait interest", n); + State::Closed + }, + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => { + // This is the expected case reading in this state + self.read_would_block = true; + state + }, + _ => { + warn!("socket error reading State::Init with Wait interest: {}", e); + State::Closed + } + } + } + }, State::Init { .. } => { trace!("on_readable State::{:?}", state); state @@ -265,7 +296,10 @@ impl> ConnInner { }, Err(::Error::Io(e)) => match e.kind() { io::ErrorKind::WouldBlock | - io::ErrorKind::Interrupted => None, + io::ErrorKind::Interrupted => { + self.read_would_block = true; + None + }, _ => { debug!("io error trying to parse {:?}", e); return State::Closed; @@ -459,10 +493,15 @@ impl> ConnInner { } fn can_read_more(&self, was_init: bool) -> bool { - match self.state { + let transport_blocked = self.transport.blocked().is_some(); + let read_would_block = self.read_would_block; + + let state_machine_ok = match self.state { State::Init { .. } => !was_init && !self.buf.is_empty(), _ => !self.buf.is_empty() - } + }; + + !transport_blocked && !read_would_block && state_machine_ok } fn on_error(&mut self, err: ::Error, factory: &F) where F: MessageHandlerFactory { @@ -478,6 +517,8 @@ impl> ConnInner { fn on_readable(&mut self, scope: &mut Scope) where F: MessageHandlerFactory { + // Clear would_block flag so state is clear going into read + self.read_would_block = false; trace!("on_readable -> {:?}", self.state); let state = mem::replace(&mut self.state, State::Closed); self.state = self.read(scope, state); @@ -526,6 +567,7 @@ impl> Conn { timeout_start: Some(now), }, transport: transport, + read_would_block: false, })) }