Skip to content

Commit

Permalink
Add the DirectService trait (#118)
Browse files Browse the repository at this point in the history
This patch adds the `DirectService` trait, and related implementations
over it in `tower_balance` and `tower_buffer`. `DirectService` is
similar to a `Service`, but must be "driven" through calls to
`poll_service` for the futures returned by `call` to make progress.

The motivation behind adding this trait is that many current `Service`
implementations spawn long-running futures when the service is created,
which then drive the work necessary to turn requests into responses. A
simple example of this is a service that writes requests over a
`TcpStream` and reads responses over that same `TcpStream`. The
underlying stream must be read from to discover new responses, but there
is no single entity to drive that task. The returned futures would share
access to the stream (and worse yet, may get responses out of order),
and then service itself is not guaranteed to see any more calls to it as
the client is waiting for its requests to finish.

`DirectService` solves this by introducing a new method, `poll_service`,
which must be called to make progress on in-progress futures.
Furthermore, like `Future::poll`, `poll_service` must be called whenever
the associated task is notified so that the service can also respect
time-based operations like heartbeats.

The PR includes changes to both `tower_balance::Balance` and
`tower_buffer::Buffer` to add support for wrapping `DirectService`s. For
`Balance` this is straightforward: if the inner service is a `Service`,
the `Balance` also implements `Service`; if the inner service is a
`DirectService`, the `Balance` is itself also a `DirectService`. For
`Buffer`, this is more involved, as a `Buffer` turns any `DirectService`
*into* a `Service`. The `Buffer`'s `Worker` is spawned, and will
therefore drive the wrapped `DirectService`.

One complication arises in that `Buffer<T>` requires that `T: Service`,
but you can safely construct a `Buffer` over a `DirectService` per the
above. `Buffer` works around this by exposing

```rust
impl Service for HandleTo<S> where S: DirectService {}
```

And giving out `Buffer<HandleTo<S>>` when the `new_directed(s: S)`
constructor is invoked. Since `Buffer` never calls any methods on the
service it wraps, `HandleTo`'s implementation just consists of calls to
`unreachable!()`.

Note that `tower_buffer` now also includes a `DirectedService` type,
which is a wrapper around a `Service` that implements `DirectService`.
In theory, we could do away with this by adding a blanket impl:

```rust
impl<T> DirectedService for T where T: Service {}
```

but until we have specialization, this would prevent downstream users
from implementing `DirectService` themselves.

Finally, this also makes `Buffer` use a bounded mpsc channel, which
introduces a new capacity argument to `Buffer::new`.

Fixes #110.
  • Loading branch information
jonhoo authored and carllerche committed Nov 19, 2018
1 parent b8c1590 commit 9bae225
Show file tree
Hide file tree
Showing 11 changed files with 518 additions and 75 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
members = [
"tower-balance",
"tower-buffer",
"tower-direct-service",
"tower-discover",
"tower-filter",
"tower-in-flight-limit",
Expand Down
1 change: 1 addition & 0 deletions tower-balance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ log = "0.4.1"
rand = "0.5"
tokio-timer = "0.2.4"
tower-service = { version = "0.1", path = "../tower-service" }
tower-direct-service = { version = "0.1", path = "../tower-direct-service" }
tower-discover = { version = "0.1", path = "../tower-discover" }
indexmap = "1"

Expand Down
6 changes: 1 addition & 5 deletions tower-balance/examples/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,7 @@ impl Discover for Disco {
}
}

type DemoService<D, C> =
InFlightLimit<
Buffer<
lb::Balance<D, C>,
Req>>;
type DemoService<D, C> = InFlightLimit<Buffer<lb::Balance<D, C>, Req>>;

struct SendRequests<D, C>
where
Expand Down
167 changes: 129 additions & 38 deletions tower-balance/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![deny(dead_code)]

#[macro_use]
extern crate futures;
#[macro_use]
Expand All @@ -11,14 +9,16 @@ extern crate rand;
extern crate tokio_timer;
extern crate tower_discover;
extern crate tower_service;
extern crate tower_direct_service;

use futures::{Async, Future, Poll};
use indexmap::IndexMap;
use rand::{SeedableRng, rngs::SmallRng};
use rand::{rngs::SmallRng, SeedableRng};
use std::{fmt, error};
use std::marker::PhantomData;
use tower_discover::Discover;
use tower_service::Service;
use tower_direct_service::DirectService;

pub mod choose;
pub mod load;
Expand Down Expand Up @@ -156,11 +156,7 @@ where
/// Polls `discover` for updates, adding new items to `not_ready`.
///
/// Removals may alter the order of either `ready` or `not_ready`.
fn update_from_discover<Request>(&mut self)
-> Result<(), Error<<D::Service as Service<Request>>::Error, D::Error>>
where
D::Service: Service<Request>
{
fn update_from_discover<E>(&mut self) -> Result<(), Error<E, D::Error>> {
debug!("updating from discover");
use tower_discover::Change::*;

Expand All @@ -182,6 +178,7 @@ where
};
// XXX is it safe to just drop the Service? Or do we need some sort of
// graceful teardown?
// TODO: poll_close
}
}
}
Expand All @@ -193,10 +190,9 @@ where
///
/// When `poll_ready` returns ready, the service is removed from `not_ready` and inserted
/// into `ready`, potentially altering the order of `ready` and/or `not_ready`.
fn promote_to_ready<Request>(&mut self)
-> Result<(), Error<<D::Service as Service<Request>>::Error, D::Error>>
fn promote_to_ready<F, E>(&mut self, mut poll_ready: F) -> Result<(), Error<E, D::Error>>
where
D::Service: Service<Request>,
F: FnMut(&mut D::Service) -> Poll<(), E>,
{
let n = self.not_ready.len();
if n == 0 {
Expand All @@ -212,7 +208,7 @@ where
let (_, svc) = self.not_ready
.get_index_mut(idx)
.expect("invalid not_ready index");;
svc.poll_ready().map_err(Error::Inner)?.is_ready()
poll_ready(svc).map_err(Error::Inner)?.is_ready()
};
trace!("not_ready[{:?}]: is_ready={:?};", idx, is_ready);
if is_ready {
Expand All @@ -235,15 +231,18 @@ where
///
/// If the service exists in `ready` and does not poll as ready, it is moved to
/// `not_ready`, potentially altering the order of `ready` and/or `not_ready`.
fn poll_ready_index<Request>(&mut self, idx: usize)
-> Option<Poll<(), Error<<D::Service as Service<Request>>::Error, D::Error>>>
fn poll_ready_index<F, E>(
&mut self,
idx: usize,
mut poll_ready: F,
) -> Option<Poll<(), Error<E, D::Error>>>
where
D::Service: Service<Request>,
F: FnMut(&mut D::Service) -> Poll<(), E>,
{
match self.ready.get_index_mut(idx) {
None => return None,
Some((_, svc)) => {
match svc.poll_ready() {
match poll_ready(svc) {
Ok(Async::Ready(())) => return Some(Ok(Async::Ready(()))),
Err(e) => return Some(Err(Error::Inner(e))),
Ok(Async::NotReady) => {}
Expand All @@ -259,10 +258,9 @@ where
/// Chooses the next service to which a request will be dispatched.
///
/// Ensures that .
fn choose_and_poll_ready<Request>(&mut self)
-> Poll<(), Error<<D::Service as Service<Request>>::Error, D::Error>>
fn choose_and_poll_ready<F, E>(&mut self, mut poll_ready: F) -> Poll<(), Error<E, D::Error>>
where
D::Service: Service<Request>,
F: FnMut(&mut D::Service) -> Poll<(), E>,
{
loop {
let n = self.ready.len();
Expand All @@ -277,12 +275,52 @@ where
};

// XXX Should we handle per-endpoint errors?
if self.poll_ready_index(idx).expect("invalid ready index")?.is_ready() {
if self
.poll_ready_index(idx, &mut poll_ready)
.expect("invalid ready index")?
.is_ready()
{
self.chosen_ready_index = Some(idx);
return Ok(Async::Ready(()));
}
}
}

fn poll_ready_inner<F, E>(&mut self, mut poll_ready: F) -> Poll<(), Error<E, D::Error>>
where
F: FnMut(&mut D::Service) -> Poll<(), E>,
{
// Clear before `ready` is altered.
self.chosen_ready_index = None;

// Before `ready` is altered, check the readiness of the last-used service, moving it
// to `not_ready` if appropriate.
if let Some(idx) = self.dispatched_ready_index.take() {
// XXX Should we handle per-endpoint errors?
self.poll_ready_index(idx, &mut poll_ready)
.expect("invalid dispatched ready key")?;
}

// Update `not_ready` and `ready`.
self.update_from_discover()?;
self.promote_to_ready(&mut poll_ready)?;

// Choose the next service to be used by `call`.
self.choose_and_poll_ready(&mut poll_ready)
}

fn call<Request, F, FF>(&mut self, call: F, request: Request) -> ResponseFuture<FF, D::Error>
where
F: FnOnce(&mut D::Service, Request) -> FF,
FF: Future,
{
let idx = self.chosen_ready_index.take().expect("not ready");
let (_, svc) = self.ready.get_index_mut(idx).expect("invalid chosen ready index");
self.dispatched_ready_index = Some(idx);

let rsp = call(svc, request);
ResponseFuture(rsp, PhantomData)
}
}

impl<D, C, Request> Service<Request> for Balance<D, C>
Expand All @@ -300,31 +338,84 @@ where
/// When `Async::Ready` is returned, `chosen_ready_index` is set with a valid index
/// into `ready` referring to a `Service` that is ready to disptach a request.
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// Clear before `ready` is altered.
self.chosen_ready_index = None;
self.poll_ready_inner(D::Service::poll_ready)
}

// Before `ready` is altered, check the readiness of the last-used service, moving it
// to `not_ready` if appropriate.
if let Some(idx) = self.dispatched_ready_index.take() {
// XXX Should we handle per-endpoint errors?
self.poll_ready_index(idx).expect("invalid dispatched ready key")?;
}
fn call(&mut self, request: Request) -> Self::Future {
self.call(D::Service::call, request)
}
}

// Update `not_ready` and `ready`.
self.update_from_discover()?;
self.promote_to_ready()?;
impl<D, C, Request> DirectService<Request> for Balance<D, C>
where
D: Discover,
D::Service: DirectService<Request>,
C: Choose<D::Key, D::Service>,
{
type Response = <D::Service as DirectService<Request>>::Response;
type Error = Error<<D::Service as DirectService<Request>>::Error, D::Error>;
type Future = ResponseFuture<<D::Service as DirectService<Request>>::Future, D::Error>;

// Choose the next service to be used by `call`.
self.choose_and_poll_ready()
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.poll_ready_inner(D::Service::poll_ready)
}

fn call(&mut self, request: Request) -> Self::Future {
let idx = self.chosen_ready_index.take().expect("not ready");
let (_, svc) = self.ready.get_index_mut(idx).expect("invalid chosen ready index");
self.dispatched_ready_index = Some(idx);
self.call(D::Service::call, request)
}

let rsp = svc.call(request);
ResponseFuture(rsp, PhantomData)
fn poll_service(&mut self) -> Poll<(), Self::Error> {
let mut any_not_ready = false;

// TODO: don't re-poll services that return Ready until call is invoked on them

for (_, svc) in &mut self.ready {
if let Async::NotReady = svc.poll_service().map_err(Error::Inner)? {
any_not_ready = true;
}
}

for (_, svc) in &mut self.not_ready {
if let Async::NotReady = svc.poll_service().map_err(Error::Inner)? {
any_not_ready = true;
}
}

if any_not_ready {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
}

fn poll_close(&mut self) -> Poll<(), Self::Error> {
let mut err = None;
self.ready.retain(|_, svc| match svc.poll_close() {
Ok(Async::Ready(())) => return false,
Ok(Async::NotReady) => return true,
Err(e) => {
err = Some(e);
return false;
}
});
self.not_ready.retain(|_, svc| match svc.poll_close() {
Ok(Async::Ready(())) => return false,
Ok(Async::NotReady) => return true,
Err(e) => {
err = Some(e);
return false;
}
});

if let Some(e) = err {
return Err(Error::Inner(e));
}

if self.ready.is_empty() && self.not_ready.is_empty() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions tower-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ publish = false
[dependencies]
futures = "0.1"
tower-service = { version = "0.1", path = "../tower-service" }
tower-direct-service = { version = "0.1", path = "../tower-direct-service" }

[dev-dependencies]
tower-mock = { version = "0.1", path = "../tower-mock" }
Loading

0 comments on commit 9bae225

Please sign in to comment.