From d26237a9abf97ed995d8df254ca50f5335b54847 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 2 Nov 2022 15:48:05 +0100 Subject: [PATCH] add connection span --- server/src/server.rs | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/server/src/server.rs b/server/src/server.rs index 538fcbc24d..ee0acfa3f8 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -55,6 +55,7 @@ use tokio::sync::{watch, OwnedSemaphorePermit}; use tokio_util::compat::TokioAsyncReadCompatExt; use tower::layer::util::Identity; use tower::{Layer, Service}; +use tracing::{instrument, Instrument}; /// Default maximum connections allowed. const MAX_CONNECTIONS: u32 = 100; @@ -183,6 +184,8 @@ where stop_handle.clone(), curr_conns, max_conns, + remote_addr, + id, ))); id = id.wrapping_add(1); @@ -625,22 +628,25 @@ impl hyper::service::Service> for TowerSe self.inner.logger.on_connect(self.inner.remote_addr, &request, TransportProtocol::WebSocket); let data = self.inner.clone(); - tokio::spawn(async move { - let upgraded = match hyper::upgrade::on(request).await { - Ok(u) => u, - Err(e) => { - tracing::warn!("Could not upgrade connection: {}", e); - return; - } - }; - - let stream = BufReader::new(BufWriter::new(upgraded.compat())); - let mut ws_builder = server.into_builder(stream); - ws_builder.set_max_message_size(data.max_request_body_size as usize); - let (sender, receiver) = ws_builder.finish(); - - let _ = ws::background_task::(sender, receiver, data).await; - }); + tokio::spawn( + async move { + let upgraded = match hyper::upgrade::on(request).await { + Ok(u) => u, + Err(e) => { + tracing::warn!("Could not upgrade connection: {}", e); + return; + } + }; + + let stream = BufReader::new(BufWriter::new(upgraded.compat())); + let mut ws_builder = server.into_builder(stream); + ws_builder.set_max_message_size(data.max_request_body_size as usize); + let (sender, receiver) = ws_builder.finish(); + + let _ = ws::background_task::(sender, receiver, data).await; + } + .in_current_span(), + ); response.map(|()| hyper::Body::empty()) } @@ -723,12 +729,15 @@ where } // Attempts to accept a new connection +#[instrument(name = "connection", skip(socket, service, stop_handle, curr_conns, max_conns), level = "INFO")] async fn try_accept_connection( socket: TcpStream, service: S, mut stop_handle: StopHandle, curr_conns: usize, max_conns: usize, + remote_addr: SocketAddr, + conn_id: u32, ) where S: Service, Response = hyper::Response> + Send + 'static, S::Error: Into>,