Skip to content

Commit

Permalink
feat(transport): Add tracing support to server (#175)
Browse files Browse the repository at this point in the history
* feat(transport): Add tracing to server

* Add tracing example

* Fix duplicate versions of tracing-subscriber
  • Loading branch information
LucioFranco authored Dec 12, 2019
1 parent 1626c2e commit f46a454
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 10 deletions.
14 changes: 14 additions & 0 deletions tonic-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ path = "src/multiplex/client.rs"
name = "gcp-client"
path = "src/gcp/client.rs"

[[bin]]
name = "tracing-client"
path = "src/tracing/client.rs"

[[bin]]
name = "tracing-server"
path = "src/tracing/server.rs"

[dependencies]
tonic = { path = "../tonic", features = ["tls"] }
bytes = "0.4"
Expand All @@ -82,6 +90,12 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.6"

# Tracing
tracing = "0.1"
tracing-subscriber = { version = "0.2.0-alpha", features = ["tracing-log"] }
tracing-attributes = "0.1"
tracing-futures = "0.2"

# Required for wellknown types
prost-types = "0.5"

Expand Down
38 changes: 38 additions & 0 deletions tonic-examples/src/tracing/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
pub mod hello_world {
tonic::include_proto!("helloworld");
}

use hello_world::{greeter_client::GreeterClient, HelloRequest};
use tracing_attributes::instrument;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.init();

say_hi("Bob".into()).await?;

Ok(())
}

#[instrument]
async fn say_hi(name: String) -> Result<(), Box<dyn std::error::Error>> {
let mut client = GreeterClient::connect("http://[::1]:50051").await?;

let request = tonic::Request::new(HelloRequest { name });

tracing::info!(
message = "Sending request.",
request = %request.get_ref().name
);

let response = client.say_hello(request).await?;

tracing::info!(
message = "Got a response.",
response = %response.get_ref().message
);

Ok(())
}
51 changes: 51 additions & 0 deletions tonic-examples/src/tracing/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use tonic::{transport::Server, Request, Response, Status};

pub mod hello_world {
tonic::include_proto!("helloworld");
}

use hello_world::{
greeter_server::{Greeter, GreeterServer},
HelloReply, HelloRequest,
};

#[derive(Default)]
pub struct MyGreeter {}

#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
tracing::info!(message = "Inbound request.", metadata = ?request.metadata());

let reply = hello_world::HelloReply {
message: format!("Hello {}!", request.into_inner().name).into(),
};

tracing::debug!(message = "Sending reply.", response = %reply.message);

Ok(Response::new(reply))
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.init();

let addr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();

tracing::info!(message = "Starting server.", %addr);

Server::builder()
.trace_fn(|_| tracing::info_span!("helloworld_server"))
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;

Ok(())
}
2 changes: 1 addition & 1 deletion tonic-interop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ console = "0.9"
structopt = "0.2"

tracing = "0.1"
tracing-subscriber = "0.1.3"
tracing-subscriber = "0.2.0-alpha"
tracing-log = "0.1.0"

[build-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ transport = [
"tower",
"tower-balance",
"tower-load",
"tracing-futures",
]
tls = ["tokio-rustls"]
tls-roots = ["rustls-native-certs"]
Expand Down Expand Up @@ -68,6 +69,7 @@ tower = { git = "https://github.com/tower-rs/tower", optional = true}
tower-make = { version = "0.3", features = ["connect"] }
tower-balance = { git = "https://github.com/tower-rs/tower", optional = true }
tower-load = { git = "https://github.com/tower-rs/tower", optional = true }
tracing-futures = { version = "0.2", optional = true }

# rustls
tokio-rustls = { version = "0.12", optional = true }
Expand Down
25 changes: 25 additions & 0 deletions tonic/src/transport/server/future copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

#[pin_project]
pub struct ResponseFuture<F> {
inner: F,
span: Option<Span>,
}

impl<F: Future> Future for ResponseFuture<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();

if let Some(span) = me.span.clone().take() {
let _enter = span.enter();
// me.poll(cx).map_err(Into::into)
}
}
}
25 changes: 25 additions & 0 deletions tonic/src/transport/server/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

#[pin_project]
pub struct ResponseFuture<F> {
inner: F,
span: Option<Span>,
}

impl<F: Future> Future for ResponseFuture<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();

if let Some(span) = me.span.clone().take() {
let _enter = span.enter();
// me.poll(cx).map_err(Into::into)
}
}
}
50 changes: 41 additions & 9 deletions tonic/src/transport/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures_util::{
future::{self, poll_fn, MapErr},
TryFutureExt,
};
use http::{Request, Response};
use http::{HeaderMap, Request, Response};
use hyper::{
server::{accept::Accept, conn},
Body,
Expand All @@ -39,9 +39,11 @@ use tower::{
};
#[cfg(feature = "tls")]
use tracing::error;
use tracing_futures::{Instrument, Instrumented};

type BoxService = tower::util::BoxService<Request<Body>, Response<BoxBody>, crate::Error>;
type Interceptor = Arc<dyn Layer<BoxService, Service = BoxService> + Send + Sync + 'static>;
type TraceInterceptor = Arc<dyn Fn(&HeaderMap) -> tracing::Span + Send + Sync + 'static>;

/// A default batteries included `transport` server.
///
Expand All @@ -54,6 +56,7 @@ type Interceptor = Arc<dyn Layer<BoxService, Service = BoxService> + Send + Sync
#[derive(Default, Clone)]
pub struct Server {
interceptor: Option<Interceptor>,
trace_interceptor: Option<TraceInterceptor>,
concurrency_limit: Option<usize>,
// timeout: Option<Duration>,
#[cfg(feature = "tls")]
Expand Down Expand Up @@ -211,6 +214,17 @@ impl Server {
}
}

/// Intercept inbound headers and add a [`tracing::Span`] to each response future.
pub fn trace_fn<F>(self, f: F) -> Self
where
F: Fn(&HeaderMap) -> tracing::Span + Send + Sync + 'static,
{
Server {
trace_interceptor: Some(Arc::new(f)),
..self
}
}

/// Create a router with the `S` typed service as the first service.
///
/// This will clone the `Server` builder and create a router that will
Expand Down Expand Up @@ -241,6 +255,7 @@ impl Server {
F: Future<Output = ()>,
{
let interceptor = self.interceptor.clone();
let span = self.trace_interceptor.clone();
let concurrency_limit = self.concurrency_limit;
let init_connection_window_size = self.init_connection_window_size;
let init_stream_window_size = self.init_stream_window_size;
Expand Down Expand Up @@ -282,6 +297,7 @@ impl Server {
interceptor,
concurrency_limit,
// timeout,
span,
};

let server = hyper::Server::builder(incoming)
Expand Down Expand Up @@ -399,8 +415,10 @@ impl fmt::Debug for Server {
}
}

#[derive(Debug)]
struct Svc<S>(S);
struct Svc<S> {
inner: S,
span: Option<TraceInterceptor>,
}

impl<S> Service<Request<Body>> for Svc<S>
where
Expand All @@ -409,14 +427,26 @@ where
{
type Response = Response<BoxBody>;
type Error = crate::Error;
type Future = MapErr<S::Future, fn(S::Error) -> crate::Error>;
type Future = MapErr<Instrumented<S::Future>, fn(S::Error) -> crate::Error>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(Into::into)
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
self.0.call(req).map_err(|e| e.into())
let span = if let Some(trace_interceptor) = &self.span {
trace_interceptor(req.headers())
} else {
tracing::Span::none()
};

self.inner.call(req).instrument(span).map_err(|e| e.into())
}
}

impl<S> fmt::Debug for Svc<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Svc").finish()
}
}

Expand All @@ -425,6 +455,7 @@ struct MakeSvc<S> {
concurrency_limit: Option<usize>,
// timeout: Option<Duration>,
inner: S,
span: Option<TraceInterceptor>,
}

impl<S, T> Service<T> for MakeSvc<S>
Expand All @@ -447,6 +478,7 @@ where
let svc = self.inner.clone();
let concurrency_limit = self.concurrency_limit;
// let timeout = self.timeout.clone();
let span = self.span.clone();

Box::pin(async move {
let svc = ServiceBuilder::new()
Expand All @@ -455,10 +487,10 @@ where
.service(svc);

let svc = if let Some(interceptor) = interceptor {
let layered = interceptor.layer(BoxService::new(Svc(svc)));
BoxService::new(Svc(layered))
let layered = interceptor.layer(BoxService::new(Svc { inner: svc, span }));
BoxService::new(layered)
} else {
BoxService::new(Svc(svc))
BoxService::new(Svc { inner: svc, span })
};

Ok(svc)
Expand Down

0 comments on commit f46a454

Please sign in to comment.