Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport): Dynamic load balancing #341

Merged
10 changes: 9 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ path = "src/load_balance/client.rs"
name = "load-balance-server"
path = "src/load_balance/server.rs"

[[bin]]
name = "dynamic-load-balance-client"
path = "src/dynamic_load_balance/client.rs"

[[bin]]
name = "dynamic-load-balance-server"
path = "src/dynamic_load_balance/server.rs"

[[bin]]
name = "tls-client"
path = "src/tls/client.rs"
Expand Down Expand Up @@ -123,7 +131,7 @@ serde_json = "1.0"
rand = "0.7"
# Tracing
tracing = "0.1"
tracing-subscriber = { version = "0.2.0-alpha", features = ["tracing-log"] }
tracing-subscriber = { version = "0.2", features = ["tracing-log"] }
tracing-attributes = "0.1"
tracing-futures = "0.2"
# Required for wellknown types
Expand Down
16 changes: 15 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ $ cargo run --bin authentication-client
$ cargo run --bin authentication-server
```

## Load balance
## Load Balance

### Client

Expand All @@ -58,6 +58,20 @@ $ cargo run --bin load-balance-client
$ cargo run --bin load-balance-server
```

## Dyanmic Load Balance

### Client

```bash
$ cargo run --bin dynamic-load-balance-client
```

### Server

```bash
$ cargo run --bin dynamic-load-balance-server
```

## TLS (rustls)

### Client
Expand Down
81 changes: 81 additions & 0 deletions examples/src/dynamic_load_balance/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
pub mod pb {
tonic::include_proto!("grpc.examples.echo");
}

use pb::{echo_client::EchoClient, EchoRequest};
use tonic::transport::Channel;

use tonic::transport::Endpoint;

use std::sync::Arc;

use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use tokio::time::timeout;
use tower::discover::Change;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let e1 = Endpoint::from_static("http://[::1]:50051");
let e2 = Endpoint::from_static("http://[::1]:50052");

let (channel, mut rx) = Channel::balance_channel(10);
let mut client = EchoClient::new(channel);

let done = Arc::new(AtomicBool::new(false));
let demo_done = done.clone();
tokio::spawn(async move {
tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
println!("Added first endpoint");
let change = Change::Insert("1", e1);
let res = rx.send(change).await;
println!("{:?}", res);
tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
println!("Added second endpoint");
let change = Change::Insert("2", e2);
let res = rx.send(change).await;
println!("{:?}", res);
tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
println!("Removed first endpoint");
let change = Change::Remove("1");
let res = rx.send(change).await;
println!("{:?}", res);

tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
println!("Removed second endpoint");
let change = Change::Remove("2");
let res = rx.send(change).await;
println!("{:?}", res);

tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
println!("Added third endpoint");
let e3 = Endpoint::from_static("http://[::1]:50051");
let change = Change::Insert("3", e3);
let res = rx.send(change).await;
println!("{:?}", res);

tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
println!("Removed third endpoint");
let change = Change::Remove("3");
let res = rx.send(change).await;
println!("{:?}", res);
demo_done.swap(true, SeqCst);
});

while !done.load(SeqCst) {
tokio::time::delay_for(tokio::time::Duration::from_millis(500)).await;
let request = tonic::Request::new(EchoRequest {
message: "hello".into(),
});

let rx = client.unary_echo(request);
if let Ok(resp) = timeout(tokio::time::Duration::from_secs(10), rx).await {
println!("RESPONSE={:?}", resp);
} else {
println!("did not receive value within 10 secs");
}
}

println!("... Bye");

Ok(())
}
82 changes: 82 additions & 0 deletions examples/src/dynamic_load_balance/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
pub mod pb {
tonic::include_proto!("grpc.examples.echo");
}

use futures::Stream;
use std::net::SocketAddr;
use std::pin::Pin;
use tokio::sync::mpsc;
use tonic::{transport::Server, Request, Response, Status, Streaming};

use pb::{EchoRequest, EchoResponse};

type EchoResult<T> = Result<Response<T>, Status>;
type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send + Sync>>;

#[derive(Debug)]
pub struct EchoServer {
addr: SocketAddr,
}

#[tonic::async_trait]
impl pb::echo_server::Echo for EchoServer {
async fn unary_echo(&self, request: Request<EchoRequest>) -> EchoResult<EchoResponse> {
let message = format!("{} (from {})", request.into_inner().message, self.addr);

Ok(Response::new(EchoResponse { message }))
}

type ServerStreamingEchoStream = ResponseStream;

async fn server_streaming_echo(
&self,
_: Request<EchoRequest>,
) -> EchoResult<Self::ServerStreamingEchoStream> {
Err(Status::unimplemented("not implemented"))
}

async fn client_streaming_echo(
&self,
_: Request<Streaming<EchoRequest>>,
) -> EchoResult<EchoResponse> {
Err(Status::unimplemented("not implemented"))
}

type BidirectionalStreamingEchoStream = ResponseStream;

async fn bidirectional_streaming_echo(
&self,
_: Request<Streaming<EchoRequest>>,
) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
Err(Status::unimplemented("not implemented"))
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addrs = ["[::1]:50051", "[::1]:50052"];

let (tx, mut rx) = mpsc::unbounded_channel();

for addr in &addrs {
let addr = addr.parse()?;
let tx = tx.clone();

let server = EchoServer { addr };
let serve = Server::builder()
.add_service(pb::echo_server::EchoServer::new(server))
.serve(addr);

tokio::spawn(async move {
if let Err(e) = serve.await {
eprintln!("Error = {:?}", e);
}

tx.send(()).unwrap();
});
}

rx.recv().await;

Ok(())
}
37 changes: 25 additions & 12 deletions tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub use endpoint::Endpoint;
#[cfg(feature = "tls")]
pub use tls::ClientTlsConfig;

use super::service::{Connection, ServiceList};
use super::service::{Connection, DynamicServiceStream};
use crate::{body::BoxBody, client::GrpcService};
use bytes::Bytes;
use http::{
Expand All @@ -20,13 +20,18 @@ use hyper::client::connect::Connection as HyperConnection;
use std::{
fmt,
future::Future,
hash::Hash,
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::mpsc::{channel, Sender},
};

use tower::{
buffer::{self, Buffer},
discover::Discover,
discover::{Change, Discover},
util::{BoxService, Either},
Service,
};
Expand Down Expand Up @@ -104,17 +109,25 @@ impl Channel {
/// This creates a [`Channel`] that will load balance accross all the
/// provided endpoints.
pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
let list = list.collect::<Vec<_>>();
let (channel, mut tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
list.for_each(|endpoint| {
tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint))
.unwrap();
});

let buffer_size = list
.iter()
.next()
.and_then(|e| e.buffer_size)
.unwrap_or(DEFAULT_BUFFER_SIZE);

let discover = ServiceList::new(list);
channel
}

Self::balance(discover, buffer_size)
/// Balance a list of [`Endpoint`]'s.
///
/// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
where
K: Hash + Eq + Send + Clone + 'static,
{
let (tx, rx) = channel(capacity);
let list = DynamicServiceStream::new(rx);
(Self::balance(list, DEFAULT_BUFFER_SIZE), tx)
}

pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
Expand Down
Loading