From ef4c08c9e99f683356abeb6387e2409c6246edf3 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Tue, 11 Oct 2016 14:20:30 -0700 Subject: [PATCH 1/5] fix(client): Handle connection error Previously, the connection would be kept alive if there was a connect error. Now it's closed immediately. --- src/client/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index b82904973b..fb21c09b6b 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -483,11 +483,11 @@ where C: Connect, if let Some(err) = seed.1.take_socket_error().err() { debug!("error while connecting: {:?}", err); scope.pop_queue(&seed.0).map(move |mut queued| queued.handler.on_error(::Error::Io(err))); - rotor::Response::done() } else { trace!("connecting is_error, but no socket error"); - rotor::Response::ok(ClientFsm::Connecting(seed)) } + + rotor::Response::done() } else if events.is_writable() { if scope.queue.contains_key(&seed.0) { trace!("connected and writable {:?}", seed.0); From 34c4d72712ca8193df778a51ed556673f016bedb Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Tue, 11 Oct 2016 21:02:26 -0700 Subject: [PATCH 2/5] fix(client): Always register for hup --- src/client/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index fb21c09b6b..90e66579db 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -372,7 +372,7 @@ macro_rules! conn_response { } None => { if let Some((key, socket)) = $scope.awaiting_slot.pop_front() { - rotor_try!($scope.register(&socket, EventSet::writable(), PollOpt::level())); + rotor_try!($scope.register(&socket, EventSet::writable() | EventSet::hup(), PollOpt::level())); rotor::Response::ok(ClientFsm::Connecting((key, socket))) } else { rotor::Response::done() @@ -460,7 +460,7 @@ where C: Connect, type Seed = (C::Key, C::Output); fn create(seed: Self::Seed, scope: &mut Scope) -> rotor::Response { - rotor_try!(scope.register(&seed.1, EventSet::writable(), PollOpt::level())); + rotor_try!(scope.register(&seed.1, EventSet::writable() | EventSet::hup(), PollOpt::level())); rotor::Response::ok(ClientFsm::Connecting(seed)) } From 915af2a3a85094d7882f8950a557cce62e18530e Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Tue, 11 Oct 2016 19:25:58 -0700 Subject: [PATCH 3/5] fix(http): Handle readable in keep-alive It's possible that a connection will be closed and the only way to find out is by doing a read. The keep-alive state (State::Init + Next_::Wait) now tries to read on readable. In the case of EOF, it returns state closed. If bytes are actually available, it's a connection error, and the connection is closed. Otherwise, it's just a spurious wakeup. --- src/http/conn.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/http/conn.rs b/src/http/conn.rs index 3f486c52d4..c7b57581fb 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -236,6 +236,29 @@ impl> ConnInner { } } }, + State::Init { interest: Next_::Wait, .. } => { + match self.buf.read_from(&mut self.transport) { + Ok(0) => { + // End-of-file; connection was closed by peer + State::Closed + }, + Ok(n) => { + // Didn't expect bytes here! Close the connection. + warn!("read {} bytes in State::Init with Wait interest", n); + State::Closed + }, + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => { + // This is the expected case reading in this state + state + }, + _ => { + warn!("socket error reading State::Init with Wait interest: {}", e); + State::Closed + } + } + } + }, State::Init { .. } => { trace!("on_readable State::{:?}", state); state From ff556199e6823432b07a20cb4816bf2a626ed52d Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Wed, 12 Oct 2016 16:39:41 -0700 Subject: [PATCH 4/5] fix(client): Improve keep-alive reuse strategy The previous keep-alive strategy was to cycle connections in a round-robin style. However, that will always keep more connections around than are needed. This new strategy will allow extra connections to expire when only a few are needed. This is accomplished by prefering to reuse a connection that was just released to the pool over one that has been there for a long time. --- src/client/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 90e66579db..a779c29dee 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -541,6 +541,7 @@ where C: Connect, // Check all idle connections regardless of origin for (key, idle) in scope.idle_conns.iter_mut() { + // Pop from the front since those are lease recently used while let Some(ctrl) = idle.pop_front() { // Signal connection to close. An err here means the // socket is already dead can should be tossed. @@ -667,7 +668,9 @@ where C: Connect, let mut remove_idle = false; let mut woke_up = false; if let Some(mut idle) = scope.idle_conns.get_mut(&key) { - while let Some(ctrl) = idle.pop_front() { + // Pop from back since those are most recently used. Connections + // at the front are allowed to expire. + while let Some(ctrl) = idle.pop_back() { // err means the socket has since died if ctrl.ready(Next::write()).is_ok() { woke_up = true; From 20fac49aa91307b6c60b43b6f3b4657e4621c93c Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Thu, 13 Oct 2016 10:02:59 -0700 Subject: [PATCH 5/5] fix(http): Prevent busy looping We encountered some issues where the `Conn::ready()` would busy loop on reads. Previously, the `ConnInner::can_read_more()` would not consider whether the previous read got a WouldBlock error, and it didn't consider whether the transport was blocked. Accounting for this additional state fixes the busy loop problem. --- src/http/conn.rs | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/src/http/conn.rs b/src/http/conn.rs index c7b57581fb..05792d5c5e 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -37,6 +37,10 @@ struct ConnInner> { key: K, state: State, transport: T, + /// Records a WouldBlock error when trying to read + /// + /// This flag is used to prevent busy looping + read_would_block: bool, } impl> fmt::Debug for ConnInner { @@ -153,7 +157,10 @@ impl> ConnInner { Ok(head) => head, Err(::Error::Io(e)) => match e.kind() { io::ErrorKind::WouldBlock | - io::ErrorKind::Interrupted => return state, + io::ErrorKind::Interrupted => { + self.read_would_block = true; + return state; + }, _ => { debug!("io error trying to parse {:?}", e); return State::Closed; @@ -250,6 +257,7 @@ impl> ConnInner { Err(e) => match e.kind() { io::ErrorKind::WouldBlock => { // This is the expected case reading in this state + self.read_would_block = true; state }, _ => { @@ -288,7 +296,10 @@ impl> ConnInner { }, Err(::Error::Io(e)) => match e.kind() { io::ErrorKind::WouldBlock | - io::ErrorKind::Interrupted => None, + io::ErrorKind::Interrupted => { + self.read_would_block = true; + None + }, _ => { debug!("io error trying to parse {:?}", e); return State::Closed; @@ -482,10 +493,15 @@ impl> ConnInner { } fn can_read_more(&self, was_init: bool) -> bool { - match self.state { + let transport_blocked = self.transport.blocked().is_some(); + let read_would_block = self.read_would_block; + + let state_machine_ok = match self.state { State::Init { .. } => !was_init && !self.buf.is_empty(), _ => !self.buf.is_empty() - } + }; + + !transport_blocked && !read_would_block && state_machine_ok } fn on_error(&mut self, err: ::Error, factory: &F) where F: MessageHandlerFactory { @@ -501,6 +517,8 @@ impl> ConnInner { fn on_readable(&mut self, scope: &mut Scope) where F: MessageHandlerFactory { + // Clear would_block flag so state is clear going into read + self.read_would_block = false; trace!("on_readable -> {:?}", self.state); let state = mem::replace(&mut self.state, State::Closed); self.state = self.read(scope, state); @@ -549,6 +567,7 @@ impl> Conn { timeout_start: Some(now), }, transport: transport, + read_would_block: false, })) }