From 9f589802d0b725737a1c8c0d40507667dc141668 Mon Sep 17 00:00:00 2001 From: Erin Power Date: Thu, 23 Dec 2021 13:12:03 +0100 Subject: [PATCH] Completely remove slog and replace with tracing --- Cargo.toml | 5 +- src/cluster/cluster_manager.rs | 19 +-- src/endpoint/address.rs | 11 -- src/filters/manager.rs | 37 +---- src/lib.rs | 2 +- src/proxy.rs | 2 +- src/proxy/admin.rs | 12 +- src/proxy/builder.rs | 28 +--- src/proxy/health.rs | 12 +- src/proxy/metrics.rs | 21 +-- src/proxy/server.rs | 126 +++++++--------- src/proxy/server/resource_manager.rs | 37 ++--- src/proxy/sessions/session.rs | 200 ++++++++++++-------------- src/proxy/sessions/session_manager.rs | 40 ++---- src/runner.rs | 3 - src/test_utils.rs | 36 +---- src/xds/ads_client.rs | 62 +++----- src/xds/cluster.rs | 39 ++--- src/xds/listener.rs | 16 +-- tests/compress.rs | 3 +- tests/filters.rs | 5 +- tests/firewall.rs | 5 +- tests/metrics.rs | 4 +- tests/xds.rs | 20 +-- 24 files changed, 246 insertions(+), 499 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 92a471b4f7..c805763072 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,10 +57,6 @@ rand = "0.8.4" serde = { version = "1.0.130", features = ["derive", "rc"] } serde_json = "1.0.68" serde_yaml = "0.8.21" -slog = "2.7.0" -slog-async = "2.7.0" -slog-json = "2.4.0" -slog-term = "2.8.0" snap = "1.0.5" tokio = { version = "1.13.1", features = ["rt-multi-thread", "signal", "test-util", "parking_lot"] } tokio-stream = "0.1.8" @@ -73,6 +69,7 @@ tryhard = "0.4.0" eyre = "0.6.5" stable-eyre = "0.2.2" ipnetwork = "0.18.0" +futures = "0.3.17" [target.'cfg(target_os = "linux")'.dependencies] sys-info = "0.9.0" diff --git a/src/cluster/cluster_manager.rs b/src/cluster/cluster_manager.rs index 7ea6a05b33..0f5172b0be 100644 --- a/src/cluster/cluster_manager.rs +++ b/src/cluster/cluster_manager.rs @@ -20,7 +20,6 @@ use std::sync::Arc; // and we will need to acquire a read lock with every packet that is processed // to be able to capture the current endpoint state and pass it to Filters. use parking_lot::RwLock; -use slog::{debug, o, warn, Logger}; use prometheus::{Registry, Result as MetricsResult}; use tokio::sync::{mpsc, watch}; @@ -81,13 +80,10 @@ impl ClusterManager { /// Returns a ClusterManager where the set of clusters is continuously /// updated based on responses from the provided updates channel. pub fn dynamic( - base_logger: Logger, metrics_registry: &Registry, cluster_updates_rx: mpsc::Receiver, shutdown_rx: watch::Receiver<()>, ) -> MetricsResult { - let log = base_logger.new(o!("source" => "cluster::ClusterManager")); - let cluster_manager = Self::new(metrics_registry, None)?; let metrics = cluster_manager.metrics.clone(); let cluster_manager = Arc::new(RwLock::new(cluster_manager)); @@ -95,7 +91,6 @@ impl ClusterManager { // Start a task in the background to receive cluster updates // and update the cluster manager's cluster set in turn. Self::spawn_updater( - log.clone(), metrics, cluster_manager.clone(), cluster_updates_rx, @@ -144,7 +139,6 @@ impl ClusterManager { /// Spawns a task to run a loop that receives cluster updates /// and updates the ClusterManager's state in turn. fn spawn_updater( - log: Logger, metrics: Metrics, cluster_manager: Arc>, mut cluster_updates_rx: mpsc::Receiver, @@ -156,17 +150,17 @@ impl ClusterManager { update = cluster_updates_rx.recv() => { match update { Some(update) => { - debug!(log, "Received a cluster update."); + tracing::debug!("Received a cluster update."); cluster_manager.write().update(Self::process_cluster_update(&metrics, update)); } None => { - warn!(log, "Exiting cluster update receive loop because the sender dropped the channel."); + tracing::warn!("Exiting cluster update receive loop because the sender dropped the channel."); return; } } } _ = shutdown_rx.changed() => { - debug!(log, "Exiting cluster update receive loop because a shutdown signal was received."); + tracing::debug!("Exiting cluster update receive loop because a shutdown signal was received."); return; }, } @@ -185,15 +179,13 @@ mod tests { cluster::{Cluster, LocalityEndpoints}, endpoint::{Endpoint, Endpoints, Metadata}, metadata::MetadataView, - test_utils::logger, }; #[tokio::test] async fn dynamic_cluster_manager_process_cluster_update() { let (update_tx, update_rx) = mpsc::channel(3); let (_shutdown_tx, shutdown_rx) = watch::channel(()); - let cm = ClusterManager::dynamic(logger(), &Registry::default(), update_rx, shutdown_rx) - .unwrap(); + let cm = ClusterManager::dynamic(&Registry::default(), update_rx, shutdown_rx).unwrap(); fn mapping(entries: &[(&str, &str)]) -> serde_yaml::Mapping { entries @@ -301,8 +293,7 @@ mod tests { async fn dynamic_cluster_manager_metrics() { let (update_tx, update_rx) = mpsc::channel(3); let (_shutdown_tx, shutdown_rx) = watch::channel(()); - let cm = ClusterManager::dynamic(logger(), &Registry::default(), update_rx, shutdown_rx) - .unwrap(); + let cm = ClusterManager::dynamic(&Registry::default(), update_rx, shutdown_rx).unwrap(); // Initialization metrics { diff --git a/src/endpoint/address.rs b/src/endpoint/address.rs index 6f8e4d1e91..48db170e60 100644 --- a/src/endpoint/address.rs +++ b/src/endpoint/address.rs @@ -227,17 +227,6 @@ impl<'de> Deserialize<'de> for EndpointAddress { } } -impl slog::Value for EndpointAddress { - fn serialize( - &self, - _: &slog::Record, - key: slog::Key, - serializer: &mut dyn slog::Serializer, - ) -> slog::Result { - serializer.emit_arguments(key, &format_args!("{}", self)) - } -} - /// The kind of address, such as Domain Name or IP address. **Note** that /// the `FromStr` implementation doesn't actually validate that the name is /// resolvable. Use [`EndpointAddress`] for complete address validation. diff --git a/src/filters/manager.rs b/src/filters/manager.rs index 4be472b69b..a2c21517ee 100644 --- a/src/filters/manager.rs +++ b/src/filters/manager.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use parking_lot::RwLock; use prometheus::Registry; -use slog::{debug, o, warn, Logger}; use tokio::sync::mpsc; use tokio::sync::watch; @@ -72,13 +71,10 @@ impl FilterManager { /// Returns a new instance backed by a stream of filter chain updates. /// Updates from the provided stream will be reflected in the current filter chain. pub fn dynamic( - base_logger: Logger, metrics_registry: &Registry, filter_chain_updates_rx: mpsc::Receiver>, shutdown_rx: watch::Receiver<()>, ) -> Result { - let log = Self::create_logger(base_logger); - let filter_manager = Arc::new(RwLock::new(FilterManager { // Start out with an empty filter chain. filter_chain: Arc::new(FilterChain::new(vec![], metrics_registry)?), @@ -86,12 +82,7 @@ impl FilterManager { // Start a task in the background to receive LDS updates // and update the FilterManager's filter chain in turn. - Self::spawn_updater( - log, - filter_manager.clone(), - filter_chain_updates_rx, - shutdown_rx, - ); + Self::spawn_updater(filter_manager.clone(), filter_chain_updates_rx, shutdown_rx); Ok(filter_manager) } @@ -99,7 +90,6 @@ impl FilterManager { /// Spawns a task in the background that listens for filter chain updates and /// updates the filter manager's current filter in turn. fn spawn_updater( - log: Logger, filter_manager: SharedFilterManager, mut filter_chain_updates_rx: mpsc::Receiver>, mut shutdown_rx: watch::Receiver<()>, @@ -110,34 +100,29 @@ impl FilterManager { update = filter_chain_updates_rx.recv() => { match update { Some(filter_chain) => { - debug!(log, "Received a filter chain update."); + tracing::debug!("Received a filter chain update."); filter_manager.write().update(filter_chain); } None => { - warn!(log, "Exiting filter chain update receive loop because the sender dropped the channel."); + tracing::warn!("Exiting filter chain update receive loop because the sender dropped the channel."); return; } } } _ = shutdown_rx.changed() => { - debug!(log, "Exiting filter chain update receive loop because a shutdown signal was received."); + tracing::debug!("Exiting filter chain update receive loop because a shutdown signal was received."); return; }, } } }); } - - fn create_logger(base_logger: Logger) -> Logger { - base_logger.new(o!("source" => "FilterManager")) - } } #[cfg(test)] mod tests { use super::FilterManager; use crate::filters::{Filter, FilterChain, FilterInstance, ReadContext, ReadResponse}; - use crate::test_utils::logger; use std::sync::Arc; use std::time::Duration; @@ -155,12 +140,7 @@ mod tests { let (filter_chain_updates_tx, filter_chain_updates_rx) = mpsc::channel(10); let (_shutdown_tx, shutdown_rx) = watch::channel(()); - FilterManager::spawn_updater( - logger(), - filter_manager.clone(), - filter_chain_updates_rx, - shutdown_rx, - ); + FilterManager::spawn_updater(filter_manager.clone(), filter_chain_updates_rx, shutdown_rx); let filter_chain = { let manager_guard = filter_manager.read(); @@ -233,12 +213,7 @@ mod tests { let (filter_chain_updates_tx, filter_chain_updates_rx) = mpsc::channel(10); let (shutdown_tx, shutdown_rx) = watch::channel(()); - FilterManager::spawn_updater( - logger(), - filter_manager.clone(), - filter_chain_updates_rx, - shutdown_rx, - ); + FilterManager::spawn_updater(filter_manager.clone(), filter_chain_updates_rx, shutdown_rx); // Send a shutdown signal. shutdown_tx.send(()).unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 50af6f040d..50662c32d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,7 +36,7 @@ pub type Result = std::result::Result; #[doc(inline)] pub use self::{ config::Config, - proxy::{logger, Builder, PendingValidation, Server, Validated}, + proxy::{Builder, PendingValidation, Server, Validated}, runner::{run, run_with_config}, }; diff --git a/src/proxy.rs b/src/proxy.rs index 38f24f3657..521a36bda9 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -15,7 +15,7 @@ */ pub(crate) use admin::Admin; -pub use builder::{logger, Builder, PendingValidation, Validated}; +pub use builder::{Builder, PendingValidation, Validated}; pub(crate) use health::Health; pub(crate) use metrics::Metrics; pub use server::Server; diff --git a/src/proxy/admin.rs b/src/proxy/admin.rs index 6c8ba694b0..06f8710899 100644 --- a/src/proxy/admin.rs +++ b/src/proxy/admin.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server as HyperServer, StatusCode}; -use slog::{error, info, o, Logger}; use tokio::sync::watch; use crate::cluster::cluster_manager::SharedClusterManager; @@ -28,7 +27,6 @@ use crate::filters::manager::SharedFilterManager; use crate::proxy::{config_dump, Health, Metrics}; pub struct Admin { - log: Logger, /// The address that the Admin server starts on addr: SocketAddr, metrics: Arc, @@ -44,9 +42,8 @@ struct HandleRequestArgs { } impl Admin { - pub fn new(base: &Logger, addr: SocketAddr, metrics: Arc, heath: Health) -> Self { + pub fn new(addr: SocketAddr, metrics: Arc, heath: Health) -> Self { Admin { - log: base.new(o!("source" => "proxy::Admin")), addr, metrics, health: Arc::new(heath), @@ -59,7 +56,7 @@ impl Admin { filter_manager: SharedFilterManager, mut shutdown_rx: watch::Receiver<()>, ) { - info!(self.log, "Starting admin endpoint"; "address" => self.addr.to_string()); + tracing::info!(address = %self.addr, "Starting admin endpoint"); let args = HandleRequestArgs { metrics: self.metrics.clone(), @@ -84,10 +81,9 @@ impl Admin { shutdown_rx.changed().await.ok(); }); - let log = self.log.clone(); tokio::spawn(async move { - if let Err(err) = server.await { - error!(log, "Admin server exited with an error"; "error" => %err); + if let Err(error) = server.await { + tracing::error!(%error, "Admin server exited with an error"); } }); } diff --git a/src/proxy/builder.rs b/src/proxy/builder.rs index efb0b1c27e..bd2f83967d 100644 --- a/src/proxy/builder.rs +++ b/src/proxy/builder.rs @@ -17,7 +17,6 @@ use std::{collections::HashSet, convert::TryInto, marker::PhantomData, sync::Arc}; use prometheus::Registry; -use slog::{o, Drain, Logger}; use tonic::transport::Endpoint as TonicEndpoint; use crate::config::{Config, ManagementServer, Proxy, Source, ValidationError, ValueInvalidArgs}; @@ -87,7 +86,6 @@ impl ValidationStatus for PendingValidation { /// Represents the components needed to create a Server. pub struct Builder { - log: Logger, config: Arc, filter_registry: FilterRegistry, admin: Option, @@ -97,16 +95,14 @@ pub struct Builder { impl From> for Builder { fn from(config: Arc) -> Self { - let log = logger(); - let metrics = Arc::new(Metrics::new(&log, Registry::default())); - let health = Health::new(&log); - let admin = ProxyAdmin::new(&log, config.admin.address, metrics.clone(), health); + let metrics = Arc::new(Metrics::new(Registry::default())); + let health = Health::new(); + let admin = ProxyAdmin::new(config.admin.address, metrics.clone(), health); Builder { config, filter_registry: FilterRegistry::new(FilterSet::default()), admin: Some(admin), metrics, - log, validation_status: PendingValidation, } } @@ -199,10 +195,6 @@ impl ValidatedConfig { } impl Builder { - pub fn with_log(self, log: Logger) -> Self { - Self { log, ..self } - } - pub fn with_filter_registry(self, filter_registry: FilterRegistry) -> Self { Self { filter_registry, @@ -224,7 +216,6 @@ impl Builder { ValidatedConfig::validate(self.config.clone(), &self.filter_registry, &self.metrics)?; Ok(Builder { - log: self.log, config: self.config, admin: self.admin, metrics: self.metrics, @@ -237,7 +228,6 @@ impl Builder { impl Builder { pub fn build(self) -> Server { Server { - log: self.log.new(o!("source" => "server::Server")), config: Arc::new(self.validation_status.0), proxy_metrics: ProxyMetrics::new(&self.metrics.registry) .expect("proxy metrics should be setup properly"), @@ -250,18 +240,6 @@ impl Builder { } } -/// Create a new `slog::Logger` instance using the default -/// quilkin configuration. -pub fn logger() -> Logger { - let drain = slog_json::Json::new(std::io::stdout()) - .set_pretty(false) - .add_default_keys() - .build() - .fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - slog::Logger::root(drain, o!()) -} - #[cfg(test)] mod tests { use std::convert::TryFrom; diff --git a/src/proxy/health.rs b/src/proxy/health.rs index 1b6a9d08c7..3ecdf6e07f 100644 --- a/src/proxy/health.rs +++ b/src/proxy/health.rs @@ -17,28 +17,24 @@ use std::sync::atomic::AtomicBool; use hyper::{Body, Response, StatusCode}; -use slog::{error, o, Logger}; use std::panic; use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; pub struct Health { - log: Logger, healthy: Arc, } impl Health { - pub fn new(base: &Logger) -> Self { + pub fn new() -> Self { let health = Self { - log: base.new(o!("source" => "proxy::Health")), healthy: Arc::new(AtomicBool::new(true)), }; - let log = health.log.clone(); let healthy = health.healthy.clone(); let default_hook = panic::take_hook(); panic::set_hook(Box::new(move |panic_info| { - error!(log, "Panic has occurred. Moving to Unhealthy"); + tracing::error!("Panic has occurred. Moving to Unhealthy"); healthy.swap(false, Relaxed); default_hook(panic_info); })); @@ -61,14 +57,12 @@ impl Health { #[cfg(test)] mod tests { use crate::proxy::health::Health; - use crate::test_utils::logger; use hyper::StatusCode; use std::panic; #[test] fn panic_hook() { - let log = logger(); - let health = Health::new(&log); + let health = Health::new(); let response = health.check_healthy(); assert_eq!(response.status(), StatusCode::OK); diff --git a/src/proxy/metrics.rs b/src/proxy/metrics.rs index 90ec34bcc7..cb7da3a3b0 100644 --- a/src/proxy/metrics.rs +++ b/src/proxy/metrics.rs @@ -16,21 +16,16 @@ use hyper::{Body, Response, StatusCode}; use prometheus::{Encoder, Registry, TextEncoder}; -use slog::{o, warn, Logger}; /// Metrics contains metrics configuration for the server. #[derive(Clone)] pub struct Metrics { - log: Logger, pub(crate) registry: Registry, } impl Metrics { - pub fn new(base: &Logger, registry: Registry) -> Self { - Metrics { - log: base.new(o!("source" => "proxy::Metrics")), - registry, - } + pub fn new(registry: Registry) -> Self { + Metrics { registry } } pub fn collect_metrics(&self) -> Response { @@ -39,11 +34,11 @@ impl Metrics { let encoder = TextEncoder::new(); let body = encoder .encode(&self.registry.gather(), &mut buffer) - .map_err(|err| warn!(self.log, "Failed to encode metrics"; "error" => %err)) + .map_err(|error| tracing::warn!(%error, "Failed to encode metrics")) .and_then(|_| { - String::from_utf8(buffer).map(Body::from).map_err( - |err| warn!(self.log, "Failed to convert metrics to utf8"; "error" => %err), - ) + String::from_utf8(buffer) + .map(Body::from) + .map_err(|error| tracing::warn!(%error, "Failed to convert metrics to utf8")) }); match body { @@ -65,12 +60,10 @@ mod tests { use prometheus::Registry; use crate::proxy::Metrics; - use crate::test_utils::logger; #[tokio::test] async fn collect_metrics() { - let log = logger(); - let metrics = Metrics::new(&log, Registry::default()); + let metrics = Metrics::new(Registry::default()); let response = metrics.collect_metrics(); assert_eq!(response.status(), StatusCode::OK); } diff --git a/src/proxy/server.rs b/src/proxy/server.rs index 751373435b..c014f6262d 100644 --- a/src/proxy/server.rs +++ b/src/proxy/server.rs @@ -18,7 +18,6 @@ use std::net::{Ipv4Addr, SocketAddrV4}; use std::sync::Arc; use prometheus::HistogramTimer; -use slog::{debug, error, info, trace, warn, Logger}; use tokio::net::UdpSocket; use tokio::sync::{mpsc, watch}; use tokio::task::JoinHandle; @@ -51,7 +50,6 @@ mod resource_manager; /// Server is the UDP server main implementation pub struct Server { // We use pub(super) to limit instantiation only to the Builder. - pub(super) log: Logger, pub(super) config: Arc, // Admin may be turned off, primarily for testing. pub(super) admin: Option, @@ -96,7 +94,6 @@ struct DownstreamReceiveWorkerConfig { /// Contains arguments to process a received downstream packet, through the /// filter chain and session pipeline. struct ProcessDownstreamReceiveConfig { - log: Logger, proxy_metrics: ProxyMetrics, session_metrics: SessionMetrics, cluster_manager: SharedClusterManager, @@ -110,10 +107,14 @@ impl Server { /// start the async processing of incoming UDP packets. Will block until an /// event is sent through the stop Receiver. pub async fn run(self, mut shutdown_rx: watch::Receiver<()>) -> Result<()> { - self.log_config(); + tracing::info!( + port = self.config.proxy.port, + proxy_id = &*self.config.proxy.id, + "Starting" + ); let socket = Arc::new(Server::bind(self.config.proxy.port).await?); - let session_manager = SessionManager::new(self.log.clone(), shutdown_rx.clone()); + let session_manager = SessionManager::new(shutdown_rx.clone()); let (send_packets, receive_packets) = mpsc::channel::(1024); let session_ttl = Duration::from_secs(SESSION_TIMEOUT_SECONDS); @@ -140,7 +141,7 @@ impl Server { shutdown_rx: shutdown_rx.clone(), }); - slog::info!(self.log, "Quilkin is ready."); + tracing::info!("Quilkin is ready."); tokio::select! { join_result = recv_loop => { @@ -175,7 +176,6 @@ impl Server { } ValidatedSource::Dynamic { management_servers } => { let manager = DynamicResourceManagers::new( - self.log.clone(), self.config.proxy.id.clone(), self.metrics.registry.clone(), self.filter_registry.clone(), @@ -190,14 +190,13 @@ impl Server { let execution_result_rx = manager.execution_result_rx; // Spawn a task to check for an error if the XDS client // terminates and forward the error upstream. - let log = self.log.clone(); tokio::spawn(async move { - if let Err(err) = execution_result_rx.await { + if let Err(error) = execution_result_rx.await { // TODO: For now only log the error but we would like to // initiate a shut down instead once this happens. - error!( - log, - "ClusterManager XDS client terminated with an error: {}", err + tracing::error!( + %error, + "ClusterManager XDS client terminated with an error" ); } }); @@ -214,7 +213,6 @@ impl Server { /// pipeline. fn run_recv_from(&self, args: RunRecvFromArgs) -> JoinHandle> { let session_manager = args.session_manager; - let log = self.log.clone(); let proxy_metrics = self.proxy_metrics.clone(); let session_metrics = self.session_metrics.clone(); @@ -234,7 +232,6 @@ impl Server { packet_rx, shutdown_rx: args.shutdown_rx.clone(), receive_config: ProcessDownstreamReceiveConfig { - log: log.clone(), proxy_metrics: proxy_metrics.clone(), session_metrics: session_metrics.clone(), cluster_manager: args.cluster_manager.clone(), @@ -248,7 +245,7 @@ impl Server { // Start the worker tasks that pick up received packets from their queue // and processes them. - Self::spawn_downstream_receive_workers(log.clone(), worker_configs); + Self::spawn_downstream_receive_workers(worker_configs); // Start the background task to receive downstream packets from the socket // and place them onto the worker tasks' queue for processing. @@ -282,13 +279,13 @@ impl Server { let error = eyre::eyre!( "Failed to send received packet over channel to worker" ); - error!(log, "{}", error); + tracing::error!(%error); return Err(error); } } Err(error) => { let error = eyre::eyre!(error).wrap_err("Error processing receive socket"); - error!(log, "{}", error); + tracing::error!(%error); return Err(error); } } @@ -299,10 +296,7 @@ impl Server { // For each worker config provided, spawn a background task that sits in a // loop, receiving packets from a queue and processing them through // the filter chain. - fn spawn_downstream_receive_workers( - log: Logger, - worker_configs: Vec, - ) { + fn spawn_downstream_receive_workers(worker_configs: Vec) { for DownstreamReceiveWorkerConfig { worker_id, mut packet_rx, @@ -310,8 +304,6 @@ impl Server { receive_config, } in worker_configs { - let log = log.clone(); - tokio::spawn(async move { loop { tokio::select! { @@ -319,13 +311,13 @@ impl Server { match packet { Some(packet) => Self::process_downstream_received_packet(packet, &receive_config).await, None => { - debug!(log, "Worker-{} exiting: work sender channel was closed.", worker_id); + tracing::debug!(id = worker_id, "work sender channel was closed."); return; } } } _ = shutdown_rx.changed() => { - debug!(log, "Worker-{} exiting: received shutdown signal.", worker_id); + tracing::debug!(id = worker_id, "received shutdown signal."); return; } } @@ -339,11 +331,10 @@ impl Server { packet: DownstreamPacket, args: &ProcessDownstreamReceiveConfig, ) { - trace!( - args.log, - "Packet Received"; - "from" => &packet.from, - "contents" => debug::bytes_to_string(&packet.contents), + tracing::trace!( + from = %packet.from, + contents = %debug::bytes_to_string(&packet.contents), + "Packet Received" ); let endpoints = match args.cluster_manager.read().get_all_endpoints() { @@ -389,7 +380,7 @@ impl Server { let guard = args.session_manager.get_sessions().await; if let Some(session) = guard.get(&session_key) { // If it exists then send the packet, we're done. - Self::session_send_packet_helper(&args.log, session, packet, args.session_ttl).await + Self::session_send_packet_helper(session, packet, args.session_ttl).await } else { // If it does not exist, grab a write lock so that we can create it. // @@ -407,8 +398,7 @@ impl Server { if let Some(session) = guard.get(&session_key) { // If the session now exists then we have less work to do, // simply send the packet. - Self::session_send_packet_helper(&args.log, session, packet, args.session_ttl) - .await; + Self::session_send_packet_helper(session, packet, args.session_ttl).await; } else { // Otherwise, create the session and insert into the map. let session_args = SessionArgs { @@ -420,7 +410,7 @@ impl Server { sender: args.send_packets.clone(), ttl: args.session_ttl, }; - match session_args.into_session(&args.log).await { + match session_args.into_session().await { Ok(session) => { // Insert the session into the map and release the write lock // immediately since we don't want to block other threads while we send @@ -433,23 +423,17 @@ impl Server { // Grab a read lock to send the packet. let guard = args.session_manager.get_sessions().await; if let Some(session) = guard.get(&session_key) { - Self::session_send_packet_helper( - &args.log, - session, - packet, - args.session_ttl, - ) - .await; + Self::session_send_packet_helper(session, packet, args.session_ttl) + .await; } else { - warn!( - args.log, - "Could not find session"; - "key" => format!("({}:{})", session_key.source.to_string(), session_key.destination.to_string()) + tracing::warn!( + key = %format!("({}:{})", session_key.source, session_key.destination), + "Could not find session" ) } } - Err(err) => { - error!(args.log, "Failed to ensure session exists"; "error" => %err); + Err(error) => { + tracing::error!(%error, "Failed to ensure session exists"); } } } @@ -457,19 +441,14 @@ impl Server { } // A helper function to push a session's packet on its socket. - async fn session_send_packet_helper( - log: &Logger, - session: &Session, - packet: &[u8], - ttl: Duration, - ) { + async fn session_send_packet_helper(session: &Session, packet: &[u8], ttl: Duration) { match session.send(packet).await { Ok(_) => { - if let Err(err) = session.update_expiration(ttl) { - warn!(log, "Error updating session expiration"; "error" => %err) + if let Err(error) = session.update_expiration(ttl) { + tracing::warn!(%error, "Error updating session expiration") } } - Err(err) => error!(log, "Error sending packet from session"; "error" => %err), + Err(error) => tracing::error!(%error, "Error sending packet from session"), }; } @@ -480,39 +459,32 @@ impl Server { socket: Arc, mut receive_packets: mpsc::Receiver, ) { - let log = self.log.clone(); tokio::spawn(async move { while let Some(packet) = receive_packets.recv().await { - debug!( - log, - "Sending packet back to origin"; - "origin" => packet.dest(), - "contents" => debug::bytes_to_string(packet.contents()), + tracing::debug!( + origin = %packet.dest(), + contents = %debug::bytes_to_string(packet.contents()), + "Sending packet back to origin" ); let address = match packet.dest().to_socket_addr() { Ok(address) => address, Err(error) => { - error!(log, "Error resolving address"; "dest" => %packet.dest(), "error" => %error); + tracing::error!(dest = %packet.dest(), %error, "Error resolving address"); continue; } }; - if let Err(err) = socket.send_to(packet.contents(), address).await { - error!(log, "Error sending packet"; "dest" => %packet.dest(), "error" => %err); + if let Err(error) = socket.send_to(packet.contents(), address).await { + tracing::error!(dest = %packet.dest(), %error, "Error sending packet"); } packet.stop_and_record(); } - debug!(log, "Receiver closed"); + tracing::debug!("Receiver closed"); Ok::<_, eyre::Error>(()) }); } - /// log_config outputs a log of what is configured - fn log_config(&self) { - info!(self.log, "Starting"; "port" => self.config.proxy.port, "proxy_id" => &self.config.proxy.id); - } - /// bind binds the local configured port async fn bind(port: u16) -> Result { let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port); @@ -529,7 +501,6 @@ mod tests { use std::time::{SystemTime, UNIX_EPOCH}; use prometheus::{Histogram, HistogramOpts, Registry}; - use slog::info; use tokio::sync::mpsc; use tokio::time; use tokio::time::timeout; @@ -668,7 +639,7 @@ mod tests { ) -> Result { let t = TestHelper::default(); - info!(t.log, "Test"; "name" => name); + tracing::info!(%name, "Test"); let msg = "hello".to_string(); let endpoint = t.open_socket_and_recv_single_packet().await; @@ -677,7 +648,7 @@ mod tests { // need to switch to 127.0.0.1, as the request comes locally receive_addr.set_ip("127.0.0.1".parse().unwrap()); - let session_manager = SessionManager::new(t.log.clone(), shutdown_rx.clone()); + let session_manager = SessionManager::new(shutdown_rx.clone()); let (send_packets, mut recv_packets) = mpsc::channel::(1); let time_increment = 10; @@ -695,7 +666,7 @@ mod tests { ) .unwrap(); let filter_manager = FilterManager::fixed(chain.clone()); - let metrics = Arc::new(Metrics::new(&t.log, registry.clone())); + let metrics = Arc::new(Metrics::new(registry.clone())); let proxy_metrics = ProxyMetrics::new(&metrics.registry).unwrap(); for worker_id in 0..num_workers { @@ -710,7 +681,6 @@ mod tests { packet_rx, shutdown_rx: shutdown_rx.clone(), receive_config: ProcessDownstreamReceiveConfig { - log: t.log.clone(), proxy_metrics: proxy_metrics.clone(), session_metrics, cluster_manager: cluster_manager.clone(), @@ -722,7 +692,7 @@ mod tests { }) } - Server::spawn_downstream_receive_workers(t.log.clone(), worker_configs); + Server::spawn_downstream_receive_workers(worker_configs); for packet_tx in packet_txs { packet_tx @@ -809,7 +779,7 @@ mod tests { let msg = "hello"; let endpoint = t.open_socket_and_recv_single_packet().await; let socket = t.create_socket().await; - let session_manager = SessionManager::new(t.log.clone(), shutdown_rx.clone()); + let session_manager = SessionManager::new(shutdown_rx.clone()); let (send_packets, mut recv_packets) = mpsc::channel::(1); let config = Arc::new(config_with_dummy_endpoint().build()); diff --git a/src/proxy/server/resource_manager.rs b/src/proxy/server/resource_manager.rs index 97491c708d..89c971c726 100644 --- a/src/proxy/server/resource_manager.rs +++ b/src/proxy/server/resource_manager.rs @@ -25,7 +25,6 @@ use crate::{ xds::ads_client::{AdsClient, ClusterUpdate, UPDATES_CHANNEL_BUFFER_SIZE}, }; use prometheus::Registry; -use slog::{o, warn, Logger}; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, watch}; @@ -58,7 +57,6 @@ impl StaticResourceManagers { /// Contains arguments to the `spawn_ads_client` function. struct SpawnAdsClient { - log: Logger, metrics_registry: Registry, node_id: String, management_servers: Vec, @@ -70,15 +68,12 @@ struct SpawnAdsClient { impl DynamicResourceManagers { pub(super) async fn new( - base_logger: Logger, xds_node_id: String, metrics_registry: Registry, filter_registry: FilterRegistry, management_servers: Vec, shutdown_rx: watch::Receiver<()>, ) -> Result { - let log = base_logger.new(o!("source" => "server::DynamicResourceManager")); - let (cluster_updates_tx, cluster_updates_rx) = Self::cluster_updates_channel(); let (filter_chain_updates_tx, filter_chain_updates_rx) = Self::filter_chain_updates_channel(); @@ -91,7 +86,6 @@ impl DynamicResourceManagers { let (execution_result_tx, execution_result_rx) = oneshot::channel::>(); Self::spawn_ads_client(SpawnAdsClient { - log: log.clone(), metrics_registry: metrics_registry.clone(), node_id: xds_node_id, management_servers, @@ -101,21 +95,13 @@ impl DynamicResourceManagers { shutdown_rx: shutdown_rx.clone(), })?; - let cluster_manager = ClusterManager::dynamic( - base_logger.new(o!("source" => "ClusterManager")), - &metrics_registry, - cluster_updates_rx, - shutdown_rx.clone(), - ) - .map_err(|err| InitializeError::Message(format!("{:?}", err)))?; - - let filter_manager = FilterManager::dynamic( - base_logger.new(o!("source" => "FilterManager")), - &metrics_registry, - filter_chain_updates_rx, - shutdown_rx, - ) - .map_err(|err| InitializeError::Message(format!("{:?}", err)))?; + let cluster_manager = + ClusterManager::dynamic(&metrics_registry, cluster_updates_rx, shutdown_rx.clone()) + .map_err(|err| InitializeError::Message(format!("{:?}", err)))?; + + let filter_manager = + FilterManager::dynamic(&metrics_registry, filter_chain_updates_rx, shutdown_rx) + .map_err(|err| InitializeError::Message(format!("{:?}", err)))?; Ok(Self { cluster_manager, @@ -129,7 +115,6 @@ impl DynamicResourceManagers { // as well as execution result after termination are sent on the passed-in channels. fn spawn_ads_client(args: SpawnAdsClient) -> Result<(), InitializeError> { let SpawnAdsClient { - log, metrics_registry, node_id, management_servers, @@ -139,7 +124,7 @@ impl DynamicResourceManagers { shutdown_rx, } = args; - let client = AdsClient::new(log.clone(), &metrics_registry).map_err(|err| { + let client = AdsClient::new(&metrics_registry).map_err(|err| { InitializeError::Message(format!("failed to initialize xDS client: {:?}", err)) })?; tokio::spawn(async move { @@ -154,7 +139,9 @@ impl DynamicResourceManagers { .await; execution_result_tx .send(result) - .map_err(|_err| warn!(log, "Failed to send ADS client execution result on channel")) + .map_err(|_err| { + tracing::warn!("Failed to send ADS client execution result on channel") + }) .ok(); }); @@ -178,7 +165,6 @@ mod tests { use super::DynamicResourceManagers; use crate::config::ManagementServer; use crate::filters::{manager::ListenerManagerArgs, FilterRegistry}; - use crate::test_utils::logger; use std::time::Duration; @@ -199,7 +185,6 @@ mod tests { let (_shutdown_tx, shutdown_rx) = watch::channel(()); DynamicResourceManagers::spawn_ads_client(SpawnAdsClient { - log: logger(), metrics_registry: Registry::default(), node_id: "id".into(), management_servers: vec![ManagementServer { diff --git a/src/proxy/sessions/session.rs b/src/proxy/sessions/session.rs index e3f5019e31..3132c55ee9 100644 --- a/src/proxy/sessions/session.rs +++ b/src/proxy/sessions/session.rs @@ -23,7 +23,6 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; -use slog::{debug, error, o, trace, warn, Logger}; use tokio::{ net::UdpSocket, select, @@ -45,7 +44,6 @@ type Result = std::result::Result; /// Session encapsulates a UDP stream session pub struct Session { - log: Logger, metrics: Metrics, proxy_metrics: ProxyMetrics, filter_manager: SharedFilterManager, @@ -131,16 +129,14 @@ pub struct SessionArgs { impl SessionArgs { /// Creates a new Session, and starts the process of receiving udp sockets /// from its ephemeral port from endpoint(s) - pub async fn into_session(self, base: &Logger) -> Result { - Session::new(base, self).await + pub async fn into_session(self) -> Result { + Session::new(self).await } } impl Session { /// internal constructor for a Session from SessionArgs - async fn new(base: &Logger, args: SessionArgs) -> Result { - let log = base - .new(o!("source" => "proxy::Session", "from" => args.from.clone(), "dest_address" => args.dest.address.clone())); + async fn new(args: SessionArgs) -> Result { let addr = (std::net::Ipv4Addr::UNSPECIFIED, 0); let socket = Arc::new(UdpSocket::bind(addr).await.map_err(Error::BindUdpSocket)?); let (shutdown_tx, shutdown_rx) = watch::channel::<()>(()); @@ -151,7 +147,6 @@ impl Session { let s = Session { metrics: args.metrics, proxy_metrics: args.proxy_metrics, - log, filter_manager: args.filter_manager, socket: socket.clone(), from: args.from, @@ -160,7 +155,8 @@ impl Session { expiration, shutdown_tx, }; - debug!(s.log, "Session created"); + + tracing::debug!(from = %s.from, dest = ?s.dest, "Session created"); s.metrics.sessions_total.inc(); s.metrics.active_sessions.inc(); @@ -176,7 +172,6 @@ impl Session { mut sender: mpsc::Sender, mut shutdown_rx: watch::Receiver<()>, ) { - let log = self.log.clone(); let from = self.from.clone(); let expiration = self.expiration.clone(); let filter_manager = self.filter_manager.clone(); @@ -186,19 +181,19 @@ impl Session { tokio::spawn(async move { let mut buf: Vec = vec![0; 65535]; loop { - debug!(log, "Awaiting incoming packet"); + tracing::debug!(from = %from, dest = ?endpoint, "Awaiting incoming packet"); + select! { received = socket.recv_from(&mut buf) => { match received { - Err(err) => { + Err(error) => { metrics.rx_errors_total.inc(); - error!(log, "Error receiving packet"; "error" => %err); + tracing::error!(%error, %from, dest = ?endpoint, "Error receiving packet"); }, Ok((size, recv_addr)) => { metrics.rx_bytes_total.inc_by(size as u64); metrics.rx_packets_total.inc(); Session::process_recv_packet( - &log, &metrics, &mut sender, &expiration, @@ -215,7 +210,7 @@ impl Session { }; } _ = shutdown_rx.changed() => { - debug!(log, "Closing Session"); + tracing::debug!(%from, dest = ?endpoint, "Closing Session"); return; } }; @@ -238,7 +233,6 @@ impl Session { /// process_recv_packet processes a packet that is received by this session. async fn process_recv_packet( - log: &Logger, metrics: &Metrics, sender: &mut mpsc::Sender, expiration: &Arc, @@ -254,31 +248,30 @@ impl Session { timer, } = packet_ctx; - trace!(log, "Received packet"; "from" => from.clone(), - "endpoint_addr" => &endpoint.address, - "contents" => debug::bytes_to_string(packet)); + tracing::trace!(%from, dest = %endpoint.address, contents = %debug::bytes_to_string(packet), "Received packet"); - if let Err(err) = Session::do_update_expiration(expiration, ttl) { - warn!(log, "Error updating session expiration"; "error" => %err) + if let Err(error) = Session::do_update_expiration(expiration, ttl) { + tracing::warn!(%error, "Error updating session expiration") } let filter_chain = { let filter_manager_guard = filter_manager.read(); filter_manager_guard.get_filter_chain() }; - if let Some(response) = filter_chain.write(WriteContext::new( - endpoint, - from, - to.clone(), - packet.to_vec(), - )) { - if let Err(err) = sender - .send(UpstreamPacket::new(to, response.contents, timer)) - .await - { - metrics.rx_errors_total.inc(); - error!(log, "Error sending packet to channel"; "error" => %err); - } + + let write_result: futures::future::OptionFuture<_> = filter_chain + .write(WriteContext::new( + endpoint, + from, + to.clone(), + packet.to_vec(), + )) + .map(|response| sender.send(UpstreamPacket::new(to, response.contents, timer))) + .into(); + + if let Some(Err(error)) = write_result.await { + metrics.rx_errors_total.inc(); + tracing::error!(%error, "Error sending packet to channel"); } else { metrics.packets_dropped_total.inc(); } @@ -314,9 +307,10 @@ impl Session { /// Sends a packet to the Session's dest. pub async fn send(&self, buf: &[u8]) -> crate::Result> { - trace!(self.log, "Sending packet"; - "dest_address" => &self.dest.address, - "contents" => debug::bytes_to_string(buf)); + tracing::trace!( + dest_address = %self.dest.address, + contents = %debug::bytes_to_string(buf), + "Sending packet"); self.do_send(buf) .await @@ -349,10 +343,10 @@ impl Drop for Session { .observe(self.created_at.elapsed().as_secs() as f64); if let Err(error) = self.shutdown_tx.send(()) { - warn!(self.log, "Error sending session shutdown signal"; "error" => error.to_string()); + tracing::warn!(%error, "Error sending session shutdown signal"); } - debug!(self.log, "Session closed"; "from" => &self.from, "dest_address" => &self.dest.address); + tracing::debug!(from = %self.from, dest_address = %self.dest.address, "Session closed"); } } @@ -390,20 +384,17 @@ mod tests { let (send_packet, mut recv_packet) = mpsc::channel::(5); let registry = Registry::default(); - let sess = Session::new( - &t.log, - SessionArgs { - metrics: Metrics::new(®istry).unwrap(), - proxy_metrics: ProxyMetrics::new(®istry).unwrap(), - filter_manager: FilterManager::fixed(Arc::new( - FilterChain::new(vec![], ®istry).unwrap(), - )), - from: addr.clone(), - dest: endpoint, - sender: send_packet, - ttl: Duration::from_secs(20), - }, - ) + let sess = Session::new(SessionArgs { + metrics: Metrics::new(®istry).unwrap(), + proxy_metrics: ProxyMetrics::new(®istry).unwrap(), + filter_manager: FilterManager::fixed(Arc::new( + FilterChain::new(vec![], ®istry).unwrap(), + )), + from: addr.clone(), + dest: endpoint, + sender: send_packet, + ttl: Duration::from_secs(20), + }) .await .unwrap(); @@ -445,20 +436,17 @@ mod tests { let endpoint = Endpoint::new(addr.clone()); let registry = Registry::default(); - let session = Session::new( - &t.log, - SessionArgs { - metrics: Metrics::new(®istry).unwrap(), - proxy_metrics: ProxyMetrics::new(®istry).unwrap(), - filter_manager: FilterManager::fixed(Arc::new( - FilterChain::new(vec![], ®istry).unwrap(), - )), - from: addr, - dest: endpoint.clone(), - sender, - ttl: Duration::from_millis(1000), - }, - ) + let session = Session::new(SessionArgs { + metrics: Metrics::new(®istry).unwrap(), + proxy_metrics: ProxyMetrics::new(®istry).unwrap(), + filter_manager: FilterManager::fixed(Arc::new( + FilterChain::new(vec![], ®istry).unwrap(), + )), + from: addr, + dest: endpoint.clone(), + sender, + ttl: Duration::from_millis(1000), + }) .await .unwrap(); session.send(msg.as_bytes()).await.unwrap(); @@ -467,7 +455,6 @@ mod tests { #[tokio::test] async fn process_recv_packet() { - let t = TestHelper::default(); let registry = Registry::default(); let histogram = Histogram::with_opts(HistogramOpts::new("test", "test")).unwrap(); @@ -486,7 +473,6 @@ mod tests { // first test with no filtering let msg = "hello"; Session::process_recv_packet( - &t.log, &Metrics::new(&Registry::default()).unwrap(), &mut sender, &expiration, @@ -521,7 +507,6 @@ mod tests { let registry = Registry::default(); let chain = new_test_chain(®istry); Session::process_recv_packet( - &t.log, &Metrics::new(®istry).unwrap(), &mut sender, &expiration, @@ -558,20 +543,17 @@ mod tests { let (send_packet, _) = mpsc::channel::(5); let registry = Registry::default(); - let session = Session::new( - &t.log, - SessionArgs { - metrics: Metrics::new(®istry).unwrap(), - proxy_metrics: ProxyMetrics::new(®istry).unwrap(), - filter_manager: FilterManager::fixed(Arc::new( - FilterChain::new(vec![], ®istry).unwrap(), - )), - from: addr, - dest: endpoint, - sender: send_packet, - ttl: Duration::from_secs(10), - }, - ) + let session = Session::new(SessionArgs { + metrics: Metrics::new(®istry).unwrap(), + proxy_metrics: ProxyMetrics::new(®istry).unwrap(), + filter_manager: FilterManager::fixed(Arc::new( + FilterChain::new(vec![], ®istry).unwrap(), + )), + from: addr, + dest: endpoint, + sender: send_packet, + ttl: Duration::from_secs(10), + }) .await .unwrap(); @@ -587,20 +569,17 @@ mod tests { let endpoint = t.open_socket_and_recv_single_packet().await; let addr: EndpointAddress = endpoint.socket.local_addr().unwrap().into(); let registry = Registry::default(); - let session = Session::new( - &t.log, - SessionArgs { - metrics: Metrics::new(®istry).unwrap(), - proxy_metrics: ProxyMetrics::new(®istry).unwrap(), - filter_manager: FilterManager::fixed(Arc::new( - FilterChain::new(vec![], ®istry).unwrap(), - )), - from: addr.clone(), - dest: Endpoint::new(addr), - sender, - ttl: Duration::from_secs(10), - }, - ) + let session = Session::new(SessionArgs { + metrics: Metrics::new(®istry).unwrap(), + proxy_metrics: ProxyMetrics::new(®istry).unwrap(), + filter_manager: FilterManager::fixed(Arc::new( + FilterChain::new(vec![], ®istry).unwrap(), + )), + from: addr.clone(), + dest: Endpoint::new(addr), + sender, + ttl: Duration::from_secs(10), + }) .await .unwrap(); session.send(b"hello").await.unwrap(); @@ -617,20 +596,17 @@ mod tests { let endpoint = t.open_socket_and_recv_single_packet().await; let addr: EndpointAddress = endpoint.socket.local_addr().unwrap().into(); let registry = Registry::default(); - let session = Session::new( - &t.log, - SessionArgs { - metrics: Metrics::new(®istry).unwrap(), - proxy_metrics: ProxyMetrics::new(®istry).unwrap(), - filter_manager: FilterManager::fixed(Arc::new( - FilterChain::new(vec![], ®istry).unwrap(), - )), - from: addr.clone(), - dest: Endpoint::new(addr), - sender: send_packet, - ttl: Duration::from_secs(10), - }, - ) + let session = Session::new(SessionArgs { + metrics: Metrics::new(®istry).unwrap(), + proxy_metrics: ProxyMetrics::new(®istry).unwrap(), + filter_manager: FilterManager::fixed(Arc::new( + FilterChain::new(vec![], ®istry).unwrap(), + )), + from: addr.clone(), + dest: Endpoint::new(addr), + sender: send_packet, + ttl: Duration::from_secs(10), + }) .await .unwrap(); diff --git a/src/proxy/sessions/session_manager.rs b/src/proxy/sessions/session_manager.rs index 17c22e118a..798b80d4d3 100644 --- a/src/proxy/sessions/session_manager.rs +++ b/src/proxy/sessions/session_manager.rs @@ -18,7 +18,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use slog::{debug, warn, Logger}; use tokio::sync::{watch, RwLock, RwLockReadGuard, RwLockWriteGuard}; use crate::proxy::sessions::{Session, SessionKey}; @@ -37,11 +36,11 @@ const SESSION_EXPIRY_POLL_INTERVAL: u64 = 60; pub struct SessionManager(Sessions); impl SessionManager { - pub fn new(log: Logger, shutdown_rx: watch::Receiver<()>) -> Self { + pub fn new(shutdown_rx: watch::Receiver<()>) -> Self { let poll_interval = Duration::from_secs(SESSION_EXPIRY_POLL_INTERVAL); let sessions: Sessions = Arc::new(RwLock::new(HashMap::new())); - Self::run_prune_sessions(log.clone(), sessions.clone(), poll_interval, shutdown_rx); + Self::run_prune_sessions(sessions.clone(), poll_interval, shutdown_rx); Self(sessions) } @@ -59,7 +58,6 @@ impl SessionManager { /// Pruning will occur ~ every interval period. So the timeout expiration may sometimes /// exceed the expected, but we don't have to write lock the Sessions map as often to clean up. fn run_prune_sessions( - log: Logger, mut sessions: Sessions, poll_interval: Duration, mut shutdown_rx: watch::Receiver<()>, @@ -70,12 +68,12 @@ impl SessionManager { loop { tokio::select! { _ = shutdown_rx.changed() => { - debug!(log, "Exiting Prune Sessions due to shutdown signal."); + tracing::debug!("Exiting Prune Sessions due to shutdown signal."); break; } _ = interval.tick() => { - debug!(log, "Attempting to Prune Sessions"); - Self::prune_sessions(&log, &mut sessions).await; + tracing::debug!("Attempting to Prune Sessions"); + Self::prune_sessions(&mut sessions).await; } } @@ -86,11 +84,11 @@ impl SessionManager { /// Removes expired [`Session`]s from `sessions`. This should be run /// regularly such as on a time interval. This will only write lock /// `sessions` if it first finds expired sessions. - async fn prune_sessions(log: &Logger, sessions: &mut Sessions) { + async fn prune_sessions(sessions: &mut Sessions) { let now = if let Ok(now) = SystemTime::now().duration_since(UNIX_EPOCH) { now.as_secs() } else { - warn!(log, "Failed to get current time when pruning sessions"); + tracing::warn!("Failed to get current time when pruning sessions"); return; }; @@ -130,7 +128,6 @@ mod tests { UpstreamPacket, }, }, - test_utils::TestHelper, }; use super::SessionManager; @@ -144,7 +141,6 @@ mod tests { #[tokio::test] async fn run_prune_sessions() { - let t = TestHelper::default(); let sessions = Arc::new(RwLock::new(HashMap::new())); let (from, to) = address_pair(); let (send, _recv) = mpsc::channel::(1); @@ -157,12 +153,7 @@ mod tests { //let config = Arc::new(config_with_dummy_endpoint().build()); //let server = Builder::from(config).validate().unwrap().build(); - SessionManager::run_prune_sessions( - t.log.clone(), - sessions.clone(), - poll_interval, - shutdown_rx, - ); + SessionManager::run_prune_sessions(sessions.clone(), poll_interval, shutdown_rx); let key = SessionKey::from((from.clone(), to.clone())); @@ -181,10 +172,7 @@ mod tests { sender: send, ttl, }; - sessions.insert( - key.clone(), - session_args.into_session(&t.log).await.unwrap(), - ); + sessions.insert(key.clone(), session_args.into_session().await.unwrap()); } // session map should be the same since, we haven't passed expiry @@ -219,7 +207,6 @@ mod tests { #[tokio::test] async fn prune_sessions() { - let t = TestHelper::default(); let mut sessions: Sessions = Arc::new(RwLock::new(HashMap::new())); let (from, to) = address_pair(); let (send, _recv) = mpsc::channel::(1); @@ -242,10 +229,7 @@ mod tests { sender: send, ttl, }; - sessions.insert( - key.clone(), - session_args.into_session(&t.log).await.unwrap(), - ); + sessions.insert(key.clone(), session_args.into_session().await.unwrap()); } // Insert key. @@ -256,7 +240,7 @@ mod tests { } // session map should be the same since, we haven't passed expiry - SessionManager::prune_sessions(&t.log, &mut sessions).await; + SessionManager::prune_sessions(&mut sessions).await; { let map = sessions.read().await; assert!(map.contains_key(&key)); @@ -266,7 +250,7 @@ mod tests { // Wait until the key has expired. tokio::time::sleep_until(tokio::time::Instant::now().add(ttl)).await; - SessionManager::prune_sessions(&t.log, &mut sessions).await; + SessionManager::prune_sessions(&mut sessions).await; { let map = sessions.read().await; assert!( diff --git a/src/runner.rs b/src/runner.rs index 0fdd8f5ad7..61428c7129 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -22,7 +22,6 @@ use tracing::{debug, info, span, Level}; use crate::{ config::Config, filters::{DynFilterFactory, FilterRegistry, FilterSet}, - proxy::logger, proxy::Builder, Result, }; @@ -42,12 +41,10 @@ pub async fn run_with_config( config: Arc, filter_factories: impl IntoIterator, ) -> Result<()> { - let base_log = logger(); // TODO: remove this, when tracing is replaceed in Server let span = span!(Level::INFO, "source::run"); let _enter = span.enter(); let server = Builder::from(config) - .with_log(base_log) .with_filter_registry(FilterRegistry::new(FilterSet::default_with( filter_factories.into_iter(), ))) diff --git a/src/test_utils.rs b/src/test_utils.rs index 3b1a0e3779..a37644e287 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -19,8 +19,6 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::str::from_utf8; use std::sync::Arc; -use slog::{o, warn, Drain, Logger}; -use slog_term::{FullFormat, PlainSyncDecorator}; use tokio::net::UdpSocket; use tokio::sync::{mpsc, oneshot, watch}; @@ -79,16 +77,8 @@ impl Filter for TestFilter { } } -// logger returns a standard out, non structured terminal logger, suitable for using in tests, -// since it's more human readable. -pub fn logger() -> Logger { - let plain = PlainSyncDecorator::new(std::io::stdout()); - let drain = FullFormat::new(plain).build().fuse(); - Logger::root(drain, o!()) -} - +#[derive(Default)] pub struct TestHelper { - pub log: Logger, /// Channel to subscribe to, and trigger the shutdown of created resources. shutdown_ch: Option<(watch::Sender<()>, watch::Receiver<()>)>, server_shutdown_tx: Vec>>, @@ -104,7 +94,6 @@ pub struct OpenSocketRecvPacket { impl Drop for TestHelper { fn drop(&mut self) { - let log = self.log.clone(); for shutdown_tx in self .server_shutdown_tx .iter_mut() @@ -113,10 +102,10 @@ impl Drop for TestHelper { { shutdown_tx .send(()) - .map_err(|err| { - warn!( - log, - "Failed to send server shutdown over channel"; "error" => %err + .map_err(|error| { + tracing::warn!( + %error, + "Failed to send server shutdown over channel" ) }) .ok(); @@ -128,16 +117,6 @@ impl Drop for TestHelper { } } -impl Default for TestHelper { - fn default() -> Self { - TestHelper { - log: logger(), - shutdown_ch: None, - server_shutdown_tx: vec![], - } - } -} - impl TestHelper { /// Opens a new socket bound to an ephemeral port pub async fn create_socket(&self) -> Arc { @@ -168,7 +147,6 @@ impl TestHelper { ) -> (mpsc::Receiver, Arc) { let (packet_tx, packet_rx) = mpsc::channel::(10); let socket = self.create_socket().await; - let log = self.log.clone(); let mut shutdown_rx = self.get_shutdown_subscriber().await; let socket_recv = socket.clone(); tokio::spawn(async move { @@ -180,8 +158,8 @@ impl TestHelper { let str = from_utf8(&buf[..size]).unwrap().to_string(); match packet_tx.send(str).await { Ok(_) => {} - Err(err) => { - warn!(log, "recv_multiple_packets: recv_chan dropped"; "error" => %err); + Err(error) => { + tracing::warn!(target: "recv_multiple_packets", %error, "recv_chan dropped"); return; } }; diff --git a/src/xds/ads_client.rs b/src/xds/ads_client.rs index 98cba36b34..32c6749931 100644 --- a/src/xds/ads_client.rs +++ b/src/xds/ads_client.rs @@ -17,7 +17,6 @@ use std::collections::HashMap; use prometheus::{Registry, Result as MetricsResult}; -use slog::{debug, error, info, o, warn, Logger}; use tokio::{ sync::{ mpsc::{self, error::SendError}, @@ -70,15 +69,13 @@ pub const UPDATES_CHANNEL_BUFFER_SIZE: usize = 1; /// AdsClient is a client that can talk to an XDS server using the ADS protocol. pub(crate) struct AdsClient { - log: Logger, metrics: Metrics, } impl AdsClient { - pub fn new(base_logger: Logger, metrics_registry: &Registry) -> MetricsResult { - let log = base_logger.new(o!("source" => "xds::AdsClient")); + pub fn new(metrics_registry: &Registry) -> MetricsResult { let metrics = Metrics::new(metrics_registry)?; - Ok(Self { log, metrics }) + Ok(Self { metrics }) } /// Continuously tracks CDS and EDS resources on an ADS server, @@ -91,7 +88,6 @@ impl AdsClient { listener_manager_args: ListenerManagerArgs, mut shutdown_rx: watch::Receiver<()>, ) -> Result<()> { - let log = self.log; let metrics = self.metrics; let mut server_iter = management_servers.iter().cycle(); @@ -100,17 +96,17 @@ impl AdsClient { let mut backoff = ExponentialBackoff::new(std::time::Duration::from_millis(500)); match error { - RpcSessionError::NonRecoverable(msg, err) => { - error!(log, "{}\n{}", msg, err); + RpcSessionError::NonRecoverable(message, error) => { + tracing::error!(%message, %error); RetryPolicy::Break } - RpcSessionError::InitialConnect(ref err) => { - error!(log, "Unable to connect to the XDS server"; "error" => %err); + RpcSessionError::InitialConnect(ref error) => { + tracing::error!(%error, "Unable to connect to the XDS server"); // Do not retry if this is an invalid URL error that we cannot recover from. // Need to use {:?} as the Display output only returns 'transport error' - let err_description = format!("{:?}", err); + let err_description = format!("{:?}", error); if err_description.to_lowercase().contains("invalid url") { RetryPolicy::Break } else { @@ -119,7 +115,7 @@ impl AdsClient { } RpcSessionError::Receive(ref status) => { - error!(log, "Failed to receive response from XDS server"; "status" => #?status); + tracing::error!(status = ?status, "Failed to receive response from XDS server"); RetryPolicy::Delay(backoff.delay(attempt, &error)) } } @@ -129,13 +125,10 @@ impl AdsClient { let handle = tryhard::retry_fn(|| { let (discovery_req_tx, discovery_req_rx) = mpsc::channel::(UPDATES_CHANNEL_BUFFER_SIZE); - let cluster_manager = ClusterManager::new( - log.clone(), - cluster_updates_tx.clone(), - discovery_req_tx.clone(), - ); + let cluster_manager = + ClusterManager::new(cluster_updates_tx.clone(), discovery_req_tx.clone()); let listener_manager = - ListenerManager::new(log.clone(), listener_manager_args.clone(), discovery_req_tx); + ListenerManager::new(listener_manager_args.clone(), discovery_req_tx); let resource_handlers = ResourceHandlers { cluster_manager, @@ -144,7 +137,6 @@ impl AdsClient { RpcSession { discovery_req_rx, - log: log.clone(), metrics: metrics.clone(), node_id: node_id.clone(), // server_iter is guaranteed to always have at least one entry. @@ -162,7 +154,7 @@ impl AdsClient { tokio::select! { result = handle => result.map(drop).map_err(|error| eyre::eyre!(error)), _ = shutdown_rx.changed() => { - info!(log, "Stopping client execution - received shutdown signal."); + tracing::info!("Stopping client execution - received shutdown signal."); Ok(()) }, } @@ -172,7 +164,6 @@ impl AdsClient { /// Represents the receiving side of the RPC channel. pub struct RpcReceiver { client: AggregatedDiscoveryServiceClient, - log: Logger, metrics: Metrics, resource_handlers: ResourceHandlers, rpc_rx: mpsc::Receiver, @@ -201,7 +192,7 @@ impl RpcReceiver { let response = match response { Ok(None) => { // No more messages on the connection. - info!(self.log, "Exiting receive loop - response stream closed."); + tracing::info!("Exiting receive loop - response stream closed."); break Ok(()) }, Err(err) => break Err(RpcSessionError::Receive(err)), @@ -211,12 +202,12 @@ impl RpcReceiver { self.metrics.update_attempt_total.inc(); if let Err(url) = self.resource_handlers.handle_discovery_response(response).await { self.metrics.update_failure_total.inc(); - error!(self.log, "Unexpected resource"; "type" => url); + tracing::error!(r#type = %url, "Unexpected resource"); } } _ = self.shutdown_rx.changed() => { - info!(self.log, "Exiting receive loop - received shutdown signal"); + tracing::info!("Exiting receive loop - received shutdown signal"); break Ok(()) } } @@ -233,7 +224,6 @@ impl RpcReceiver { /// Represents a complete aDS gRPC session. pub struct RpcSession { discovery_req_rx: mpsc::Receiver, - log: Logger, metrics: Metrics, node_id: String, addr: String, @@ -259,7 +249,6 @@ impl RpcSession { // Spawn a task that runs the receive loop. let mut recv_loop_join_handle = RpcReceiver { client, - log: self.log.clone(), metrics: self.metrics.clone(), resource_handlers: self.resource_handlers, rpc_rx, @@ -268,7 +257,6 @@ impl RpcSession { .run(); let sender = RpcSender { - log: self.log.clone(), metrics: self.metrics.clone(), rpc_tx, }; @@ -297,7 +285,7 @@ impl RpcSession { Box::new(err)) )?; } else { - info!(self.log, "Exiting send loop"); + tracing::info!("Exiting send loop"); break; } } @@ -316,7 +304,6 @@ impl RpcSession { } struct RpcSender { - log: Logger, metrics: Metrics, rpc_tx: mpsc::Sender, } @@ -367,7 +354,7 @@ impl RpcSender { self.metrics.requests_total.inc(); - debug!(self.log, "Sending rpc discovery"; "request" => #?req); + tracing::debug!(request = ?req, "Sending rpc discovery"); self.rpc_tx.send(req).await } @@ -416,7 +403,6 @@ enum RpcSessionError { // Send a Discovery request with the provided arguments on the channel. pub(super) async fn send_discovery_req( - log: Logger, type_url: &'static str, version_info: String, response_nonce: String, @@ -438,11 +424,11 @@ pub(super) async fn send_discovery_req( }), }) .await - .map_err(|err| { - warn!( - log, - "Failed to send discovery request"; - "type" => %type_url, "error" => %err + .map_err(|error| { + tracing::warn!( + r#type = %type_url, + %error, + "Failed to send discovery request" ) }) // ok is safe here since an error would mean that we've dropped/closed the receiving @@ -456,7 +442,6 @@ mod tests { use super::AdsClient; use crate::config::ManagementServer; use crate::filters::FilterRegistry; - use crate::proxy::logger; use crate::xds::ads_client::ListenerManagerArgs; use crate::xds::envoy::service::discovery::v3::DiscoveryRequest; use crate::xds::google::rpc::Status as GrpcStatus; @@ -475,7 +460,7 @@ mod tests { let (_shutdown_tx, shutdown_rx) = watch::channel::<()>(()); let (cluster_updates_tx, _) = mpsc::channel(10); let (filter_chain_updates_tx, _) = mpsc::channel(10); - let run = AdsClient::new(logger(), &Registry::default()).unwrap().run( + let run = AdsClient::new(&Registry::default()).unwrap().run( "test-id".into(), vec![ManagementServer { address: "localhost:18000".into(), @@ -502,7 +487,6 @@ mod tests { for error_message in vec![Some("Boo!".into()), None] { super::send_discovery_req( - logger(), CLUSTER_TYPE, "101".into(), "nonce-101".into(), diff --git a/src/xds/cluster.rs b/src/xds/cluster.rs index 1f9e3f6905..2f89ea475a 100644 --- a/src/xds/cluster.rs +++ b/src/xds/cluster.rs @@ -21,7 +21,6 @@ use std::{ use bytes::Bytes; use prost::Message; -use slog::{debug, o, warn, Logger}; use tokio::sync::mpsc; use crate::{ @@ -46,8 +45,6 @@ use crate::{ /// resources and exposes the results to the caller whenever /// the result changes. pub(crate) struct ClusterManager { - log: Logger, - // Send discovery requests ACKs/NACKs to the server. discovery_req_tx: mpsc::Sender, @@ -69,12 +66,10 @@ impl ClusterManager { /// ACKs/NACKs are sent on the provided channel to be forwarded to the XDS /// server. pub(in crate::xds) fn new( - base: Logger, cluster_updates_tx: mpsc::Sender>, discovery_req_tx: mpsc::Sender, ) -> Self { ClusterManager { - log: base.new(o!("source" => "xds::ClusterManager")), discovery_req_tx, cluster_updates_tx, clusters: HashMap::new(), @@ -85,8 +80,7 @@ impl ClusterManager { /// Processes a CDS response and updates its cluster view if needed. /// This method is called upon receiving a `CDS` `DiscoveryResponse` from the XDS server. pub(in crate::xds) async fn on_cluster_response(&mut self, response: DiscoveryResponse) { - debug!( - self.log, + tracing::debug!( "{}: received response containing {} resource(s)", CLUSTER_TYPE, response.resources.len() @@ -170,8 +164,7 @@ impl ClusterManager { &mut self, response: DiscoveryResponse, ) { - debug!( - self.log, + tracing::debug!( "{}: received response containing {} resource(s)", ENDPOINT_TYPE, response.resources.len() @@ -213,9 +206,9 @@ impl ClusterManager { None => { // Got an endpoint that we don't have a cluster for. This likely means that // the cluster has been deleted since ADS handles resource ordering. - warn!( - self.log, - "Got endpoint for non-existing cluster"; "name" => assignment.cluster_name + tracing::warn!( + name = %assignment.cluster_name, + "Got endpoint for non-existing cluster", ); } } @@ -233,7 +226,7 @@ impl ClusterManager { .send(self.clusters.clone()) .await .map_err(|err| { - warn!(self.log, "Failed to send cluster updates downstream"); + tracing::warn!("Failed to send cluster updates downstream"); err }) // ok is safe here because an error can only be due to downstream dropping @@ -290,7 +283,7 @@ impl ClusterManager { .try_into()?; endpoints.push(Endpoint::with_metadata( - dbg!(address), + address, metadata .map(MetadataView::try_from) .transpose()? @@ -348,7 +341,6 @@ impl ClusterManager { resource_names: Vec, ) { send_discovery_req( - self.log.clone(), type_url, version_info, response_nonce, @@ -364,7 +356,6 @@ impl ClusterManager { mod tests { use super::{ClusterManager, ProxyCluster}; use crate::endpoint::{Endpoint as ProxyEndpoint, EndpointAddress}; - use crate::test_utils::logger; use crate::xds::envoy::config::cluster::v3::{cluster::ClusterDiscoveryType, Cluster}; use crate::xds::envoy::config::core::v3::{ address, socket_address::PortSpecifier, Address, Metadata, SocketAddress, @@ -392,7 +383,7 @@ mod tests { let (cluster_updates_tx, _) = mpsc::channel::(100); let (discovery_req_tx, mut discovery_req_rx) = mpsc::channel::(100); - let mut cm = ClusterManager::new(logger(), cluster_updates_tx, discovery_req_tx); + let mut cm = ClusterManager::new(cluster_updates_tx, discovery_req_tx); let initial_names = vec!["a".into()]; cm.on_cluster_response(cluster_discovery_response("1", "2", initial_names.clone())) @@ -434,7 +425,7 @@ mod tests { let (cluster_updates_tx, _) = mpsc::channel::(100); let (discovery_req_tx, mut discovery_req_rx) = mpsc::channel::(100); - let mut cm = ClusterManager::new(logger(), cluster_updates_tx, discovery_req_tx); + let mut cm = ClusterManager::new(cluster_updates_tx, discovery_req_tx); let names = vec!["a".into(), "b".into()]; cm.on_cluster_response(cluster_discovery_response("3", "6", names.clone())) @@ -518,7 +509,7 @@ mod tests { let (cluster_updates_tx, _) = mpsc::channel::(100); let (discovery_req_tx, mut discovery_req_rx) = mpsc::channel::(100); - let mut cm = ClusterManager::new(logger(), cluster_updates_tx, discovery_req_tx); + let mut cm = ClusterManager::new(cluster_updates_tx, discovery_req_tx); let names = vec!["a".into(), "b".into()]; cm.on_cluster_response(cluster_discovery_response("3", "6", names.clone())) @@ -554,7 +545,7 @@ mod tests { let (cluster_updates_tx, _) = mpsc::channel::(100); let (discovery_req_tx, mut discovery_req_rx) = mpsc::channel::(100); - let mut cm = ClusterManager::new(logger(), cluster_updates_tx, discovery_req_tx); + let mut cm = ClusterManager::new(cluster_updates_tx, discovery_req_tx); let initial_names = vec!["a".into()]; cm.on_cluster_response(cluster_discovery_response("1", "2", initial_names.clone())) @@ -585,7 +576,7 @@ mod tests { let (cluster_updates_tx, _) = mpsc::channel::(100); let (discovery_req_tx, mut discovery_req_rx) = mpsc::channel::(100); - let mut cm = ClusterManager::new(logger(), cluster_updates_tx, discovery_req_tx); + let mut cm = ClusterManager::new(cluster_updates_tx, discovery_req_tx); cm.on_cluster_response(cluster_discovery_response( "1", @@ -635,7 +626,7 @@ mod tests { let (cluster_updates_tx, mut cluster_updates_rx) = mpsc::channel::(100); let (discovery_req_tx, _) = mpsc::channel::(100); - let mut cm = ClusterManager::new(logger(), cluster_updates_tx, discovery_req_tx); + let mut cm = ClusterManager::new(cluster_updates_tx, discovery_req_tx); cm.on_cluster_response(cluster_discovery_response( "1", @@ -698,7 +689,7 @@ mod tests { let (cluster_updates_tx, mut cluster_updates_rx) = mpsc::channel::(100); let (discovery_req_tx, _) = mpsc::channel::(100); - let mut cm = ClusterManager::new(logger(), cluster_updates_tx, discovery_req_tx); + let mut cm = ClusterManager::new(cluster_updates_tx, discovery_req_tx); cm.on_cluster_response(cluster_discovery_response( "1", @@ -754,7 +745,7 @@ mod tests { let (cluster_updates_tx, mut cluster_updates_rx) = mpsc::channel::(100); let (discovery_req_tx, _) = mpsc::channel::(100); - let mut cm = ClusterManager::new(logger(), cluster_updates_tx, discovery_req_tx); + let mut cm = ClusterManager::new(cluster_updates_tx, discovery_req_tx); cm.on_cluster_response(cluster_discovery_response_with_update( "1", diff --git a/src/xds/listener.rs b/src/xds/listener.rs index 0fc87dae76..0bbafb2653 100644 --- a/src/xds/listener.rs +++ b/src/xds/listener.rs @@ -29,15 +29,12 @@ use crate::xds::ads_client::send_discovery_req; use bytes::Bytes; use prometheus::Registry; use prost::Message; -use slog::{debug, warn, Logger}; use tokio::sync::mpsc; /// Tracks FilterChain resources on the LDS DiscoveryResponses and /// instantiates a corresponding proxy filter chain and exposes it /// to the caller whenever the filter chain changes. pub(crate) struct ListenerManager { - log: Logger, - metrics_registry: Registry, // Registry to lookup filter factories by name. @@ -52,12 +49,10 @@ pub(crate) struct ListenerManager { impl ListenerManager { pub(in crate::xds) fn new( - log: Logger, args: ListenerManagerArgs, discovery_req_tx: mpsc::Sender, ) -> Self { ListenerManager { - log, metrics_registry: args.metrics_registry, filter_registry: args.filter_registry, discovery_req_tx, @@ -66,8 +61,7 @@ impl ListenerManager { } pub(in crate::xds) async fn on_listener_response(&mut self, response: DiscoveryResponse) { - debug!( - self.log, + tracing::debug!( "{}: received response containing {} resource(s)", LISTENER_TYPE, response.resources.len() @@ -84,7 +78,7 @@ impl ListenerManager { .send(Arc::new(filter_chain)) .await .map_err(|err| { - warn!(self.log, "Failed to send filter chain update on channel"); + tracing::warn!("Failed to send filter chain update on channel"); err }) // ok is safe here because an error can only be due to the consumer dropping @@ -174,7 +168,6 @@ impl ListenerManager { resource_names: Vec, ) { send_discovery_req( - self.log.clone(), type_url, version_info, response_nonce, @@ -190,7 +183,6 @@ impl ListenerManager { mod tests { use super::ListenerManager; use crate::filters::{manager::ListenerManagerArgs, prelude::*}; - use crate::test_utils::logger; use crate::xds::envoy::config::listener::v3::{ filter::ConfigType, Filter as LdsFilter, FilterChain as LdsFilterChain, Listener, }; @@ -282,7 +274,6 @@ mod tests { let (filter_chain_updates_tx, mut filter_chain_updates_rx) = mpsc::channel(10); let (discovery_req_tx, mut discovery_req_rx) = mpsc::channel(10); let mut manager = ListenerManager::new( - logger(), ListenerManagerArgs::new( Registry::default(), filter_registry, @@ -387,7 +378,6 @@ mod tests { let (filter_chain_updates_tx, mut filter_chain_updates_rx) = mpsc::channel(10); let (discovery_req_tx, mut discovery_req_rx) = mpsc::channel(10); let mut manager = ListenerManager::new( - logger(), ListenerManagerArgs::new( Registry::default(), filter_registry, @@ -496,7 +486,6 @@ mod tests { let (filter_chain_updates_tx, _filter_chain_updates_rx) = mpsc::channel(10); let (discovery_req_tx, mut discovery_req_rx) = mpsc::channel(10); let mut manager = ListenerManager::new( - logger(), ListenerManagerArgs::new( Registry::default(), filter_registry, @@ -604,7 +593,6 @@ mod tests { let (filter_chain_updates_tx, _filter_chain_updates_rx) = mpsc::channel(10); let (discovery_req_tx, mut discovery_req_rx) = mpsc::channel(10); let mut manager = ListenerManager::new( - logger(), ListenerManagerArgs::new( Registry::default(), FilterRegistry::new(FilterSet::default()), diff --git a/tests/compress.rs b/tests/compress.rs index 7906cd7125..fdcb77e3bc 100644 --- a/tests/compress.rs +++ b/tests/compress.rs @@ -16,7 +16,6 @@ use std::net::SocketAddr; -use slog::info; use tokio::time::{timeout, Duration}; use quilkin::{ @@ -76,7 +75,7 @@ on_write: DECOMPRESS // game_client let local_addr: SocketAddr = format!("127.0.0.1:{}", client_port).parse().unwrap(); - info!(t.log, "Sending hello"; "address" => local_addr); + tracing::info!(address = %local_addr, "Sending hello"); tx.send_to(b"hello", &local_addr).await.unwrap(); let expected = timeout(Duration::from_secs(5), rx.recv()) diff --git a/tests/filters.rs b/tests/filters.rs index fb8149555f..c3cbcb4cc2 100644 --- a/tests/filters.rs +++ b/tests/filters.rs @@ -20,7 +20,6 @@ use std::{ }; use serde_yaml::{Mapping, Value}; -use slog::info; use quilkin::{ config::{Builder as ConfigBuilder, Filter}, @@ -86,7 +85,7 @@ async fn test_filter() { // game_client let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port); - info!(t.log, "Sending hello"; "address" => local_addr); + tracing::info!(address = %local_addr, "Sending hello"); socket.send_to(b"hello", &local_addr).await.unwrap(); let result = recv_chan.recv().await.unwrap(); @@ -157,7 +156,7 @@ async fn debug_filter() { // game client let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port); - info!(t.log, "Sending hello"; "address" => local_addr); + tracing::info!(address = %local_addr, "Sending hello"); socket.send_to(b"hello", &local_addr).await.unwrap(); // since the debug filter doesn't change the data, it should be exactly the same diff --git a/tests/firewall.rs b/tests/firewall.rs index 87c8995524..3d23ba5452 100644 --- a/tests/firewall.rs +++ b/tests/firewall.rs @@ -18,7 +18,6 @@ use quilkin::config::{Builder, Filter}; use quilkin::endpoint::Endpoint; use quilkin::filters::firewall; use quilkin::test_utils::TestHelper; -use slog::info; use std::net::SocketAddr; use tokio::sync::oneshot::Receiver; use tokio::time::{timeout, Duration}; @@ -99,7 +98,7 @@ async fn test(t: &mut TestHelper, server_port: u16, yaml: &str) -> Receiver yaml.as_str()); + tracing::info!(config = yaml.as_str(), "Config"); let server_config = Builder::empty() .with_port(server_port) @@ -114,7 +113,7 @@ async fn test(t: &mut TestHelper, server_port: u16, yaml: &str) -> Receiver client_addr, "address" => local_addr); + tracing::info!(from = %client_addr, address = %local_addr, "Sending hello"); recv.socket.send_to(b"hello", &local_addr).await.unwrap(); recv.packet_rx diff --git a/tests/metrics.rs b/tests/metrics.rs index 3f6154b22c..f5a188094f 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -19,8 +19,6 @@ use std::{ sync::Arc, }; -use slog::info; - use quilkin::{ config::{Admin, Builder as ConfigBuilder}, endpoint::Endpoint, @@ -64,7 +62,7 @@ async fn metrics_server() { // game_client let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port); - info!(t.log, "Sending hello"; "address" => local_addr); + tracing::info!(address = %local_addr, "Sending hello"); socket.send_to(b"hello", &local_addr).await.unwrap(); let _ = recv_chan.recv().await.unwrap(); diff --git a/tests/xds.rs b/tests/xds.rs index ed0951881d..87d0866774 100644 --- a/tests/xds.rs +++ b/tests/xds.rs @@ -134,15 +134,9 @@ use quilkin_proto::extensions::filters::concatenate_bytes::v1alpha1::{ ConcatenateBytes, }; -use quilkin::{ - config::Config, - endpoint::EndpointAddress, - test_utils::{logger, TestHelper}, - Builder, -}; +use quilkin::{config::Config, endpoint::EndpointAddress, test_utils::TestHelper, Builder}; use prost::Message; -use slog::{info, o, Logger}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -159,7 +153,6 @@ const LISTENER_TYPE: &str = "type.googleapis.com/envoy.config.listener.v3.Listen // forwards DiscoveryResponse(s) to the client. A rx chan is passed in upon creation // and can be used by the test to drive the DiscoveryResponses sent by the server to the client. struct ControlPlane { - log: Logger, source_discovery_response_rx: tokio::sync::Mutex>>>, shutdown_rx: watch::Receiver<()>, @@ -183,7 +176,6 @@ impl ADS for ControlPlane { .take() .unwrap(); - let log = self.log.clone(); let mut shutdown_rx = self.shutdown_rx.clone(); let (discovery_response_tx, discovery_response_rx) = mpsc::channel(1); tokio::spawn(async move { @@ -195,7 +187,7 @@ impl ADS for ControlPlane { source_response = source_discovery_response_rx.recv() => { match source_response { None => { - info!(log, "Stopping updates to client: source was dropped"); + tracing::info!("Stopping updates to client: source was dropped"); return; }, Some(result) => { @@ -234,21 +226,15 @@ dynamic: "; let config: Arc = Arc::new(serde_yaml::from_str(config).unwrap()); - let server = Builder::from(config) - .with_log(logger()) - .validate() - .unwrap() - .build(); + let server = Builder::from(config).validate().unwrap().build(); let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(()); let (discovery_response_tx, discovery_response_rx) = mpsc::channel(1); let mut control_plane_shutdown_rx = shutdown_rx.clone(); - let log = t.log.new(o!("source" => "control-plane")); tokio::spawn(async move { let server = ADSServer::new(ControlPlane { source_discovery_response_rx: tokio::sync::Mutex::new(Some(discovery_response_rx)), - log, shutdown_rx: control_plane_shutdown_rx.clone(), }); let server = Server::builder().add_service(server);