Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport): Support timeouts with "grpc-timeout" header #606

Merged
merged 12 commits into from
Apr 29, 2021
1 change: 1 addition & 0 deletions tests/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ bytes = "1.0"

[dev-dependencies]
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "net"] }
tokio-stream = { version = "0.1.5", features = ["net"] }

[build-dependencies]
tonic-build = { path = "../../tonic-build" }
94 changes: 94 additions & 0 deletions tests/integration_tests/tests/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use integration_tests::pb::{test_client, test_server, Input, Output};
use std::time::Duration;
use tokio::net::TcpListener;
use tonic::{transport::Server, Code, Request, Response, Status};

#[tokio::test]
#[ignore]
async fn cancelation_on_timeout() {
struct Svc;

#[tonic::async_trait]
impl test_server::Test for Svc {
async fn unary_call(&self, _req: Request<Input>) -> Result<Response<Output>, Status> {
// Wait for a time longer than the timeout
tokio::time::sleep(Duration::from_millis(1_000)).await;
Ok(Response::new(Output {}))
}
}

let svc = test_server::TestServer::new(Svc);

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.unwrap();
});

let mut client = test_client::TestClient::connect(format!("http://{}", addr))
.await
.unwrap();

let mut req = Request::new(Input {});
req.metadata_mut()
.insert("grpc-timeout", "500m".parse().unwrap());

let res = client.unary_call(req).await;
dbg!(&res);

let err = res.unwrap_err();
assert!(err.message().contains("Timeout expired"));

// TODO(david): make this work. Will require mapping `TimeoutExpired` errors into
// `Code::Cancelled` but can't quite figure out how to do that.
assert_eq!(err.code(), Code::Cancelled);
davidpdrsn marked this conversation as resolved.
Show resolved Hide resolved
}

#[tokio::test]
async fn picks_the_shortest_timeout() {
struct Svc;

#[tonic::async_trait]
impl test_server::Test for Svc {
async fn unary_call(&self, _req: Request<Input>) -> Result<Response<Output>, Status> {
// Wait for a time longer than the timeout
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(Response::new(Output {}))
}
}

let svc = test_server::TestServer::new(Svc);

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

tokio::spawn(async move {
Server::builder()
.timeout(Duration::from_millis(100))
.add_service(svc)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.unwrap();
});

let mut client = test_client::TestClient::connect(format!("http://{}", addr))
.await
.unwrap();

let mut req = Request::new(Input {});
req.metadata_mut()
// 10 hours
.insert("grpc-timeout", "10H".parse().unwrap());

// TODO(david): for some reason this fails with "h2 protocol error: protocol error: unexpected
// internal error encountered". Seems to be happening on `master` as well. Bug?
let res = client.unary_call(req).await;
dbg!(&res);
let err = res.unwrap_err();
assert!(err.message().contains("Timeout expired"));
davidpdrsn marked this conversation as resolved.
Show resolved Hide resolved
}
3 changes: 2 additions & 1 deletion tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ transport = [
"tokio",
"tower",
"tracing-futures",
"tokio/macros"
"tokio/macros",
"tokio/time",
]
tls = ["transport", "tokio-rustls"]
tls-roots = ["tls", "rustls-native-certs"]
Expand Down
3 changes: 1 addition & 2 deletions tonic/src/metadata/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,10 @@ pub struct OccupiedEntry<'a, VE: ValueEncoding> {

impl MetadataMap {
// Headers reserved by the gRPC protocol.
pub(crate) const GRPC_RESERVED_HEADERS: [&'static str; 8] = [
pub(crate) const GRPC_RESERVED_HEADERS: [&'static str; 7] = [
davidpdrsn marked this conversation as resolved.
Show resolved Hide resolved
"te",
"user-agent",
"content-type",
"grpc-timeout",
"grpc-message",
"grpc-encoding",
"grpc-message-type",
Expand Down
7 changes: 4 additions & 3 deletions tonic/src/transport/service/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::super::BoxFuture;
use super::{layer::ServiceBuilderExt, reconnect::Reconnect, AddOrigin, UserAgent};
use super::{
layer::ServiceBuilderExt, reconnect::Reconnect, timeout::Timeout, AddOrigin, UserAgent,
};
use crate::{body::BoxBody, transport::Endpoint};
use http::Uri;
use hyper::client::conn::Builder;
Expand All @@ -14,7 +16,6 @@ use tower::load::Load;
use tower::{
layer::Layer,
limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
timeout::TimeoutLayer,
util::BoxService,
ServiceBuilder, ServiceExt,
};
Expand Down Expand Up @@ -53,7 +54,7 @@ impl Connection {
let stack = ServiceBuilder::new()
.layer_fn(|s| AddOrigin::new(s, endpoint.uri.clone()))
.layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone()))
.optional_layer(endpoint.timeout.map(TimeoutLayer::new))
.layer_fn(|s| Timeout::new(s, endpoint.timeout))
.optional_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new))
.optional_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
.into_inner();
Expand Down
1 change: 1 addition & 0 deletions tonic/src/transport/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod io;
mod layer;
mod reconnect;
mod router;
mod timeout;
#[cfg(feature = "tls")]
mod tls;
mod user_agent;
Expand Down
Loading