Skip to content

Commit

Permalink
Update slog and update logging to be consistent (#217)
Browse files Browse the repository at this point in the history
* Update slog and update logging to be consistent

Updated slog, so we can use nicer formatting options, such as #? and #%,
and then also take the opportunity to make the logging consistent with
capitalisation and use of structural logging.

* Review updates and merge conflicts.

* Rename "addr" to "address" to be clearer.
  • Loading branch information
markmandel authored Mar 23, 2021
1 parent 826abf9 commit 35975f1
Show file tree
Hide file tree
Showing 17 changed files with 47 additions and 53 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ prost-types = "0.7.0"
serde = { version = "1.0.104", features = ["derive"] }
serde_yaml = "0.8.11"
serde_json = "1.0.60"
slog = "2.5.2"
slog-async = "2.4.0"
slog = "2.7.0"
slog-async = "2.6.0"
slog-json = "2.3.0"
tokio = { version = "1.1.0", features = ["rt-multi-thread", "signal", "test-util", "parking_lot"] }
slog-term = "2.5.0"
Expand Down
2 changes: 1 addition & 1 deletion src/extensions/filters/capture_bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Filter for CaptureBytes {
if self.metrics.packets_dropped_total.get() % 1000 == 0 {
warn!(
self.log,
"packets are being dropped due to their length being less than {} bytes",
"Packets are being dropped due to their length being less than {} bytes",
self.size; "count" => self.metrics.packets_dropped_total.get()
);
}
Expand Down
4 changes: 2 additions & 2 deletions src/extensions/filters/compress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl Compress {
fn failed_compression<T>(&self, err: Box<dyn std::error::Error>) -> Option<T> {
if self.metrics.packets_dropped_compress.get() % 1000 == 0 {
warn!(self.log, "Packets are being dropped as they could not be compressed";
"mode" => format!("{:?}", self.compression_mode), "error" => format!("{}", err),
"mode" => #?self.compression_mode, "error" => %err,
"count" => self.metrics.packets_dropped_compress.get());
}
self.metrics.packets_dropped_compress.inc();
Expand All @@ -199,7 +199,7 @@ impl Compress {
fn failed_decompression<T>(&self, err: Box<dyn std::error::Error>) -> Option<T> {
if self.metrics.packets_dropped_decompress.get() % 1000 == 0 {
warn!(self.log, "Packets are being dropped as they could not be decompressed";
"mode" => format!("{:?}", self.compression_mode), "error" => format!("{}", err),
"mode" => #?self.compression_mode, "error" => %err,
"count" => self.metrics.packets_dropped_decompress.get());
}
self.metrics.packets_dropped_decompress.inc();
Expand Down
2 changes: 1 addition & 1 deletion src/extensions/filters/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Filter for Debug {
}

fn write(&self, ctx: WriteContext) -> Option<WriteResponse> {
info!(self.log, "received endpoint packet"; "endpoint" => ctx.endpoint.address,
info!(self.log, "Received endpoint packet"; "endpoint" => ctx.endpoint.address,
"from" => ctx.from,
"to" => ctx.to,
"contents" => packet_to_string(ctx.contents.clone()));
Expand Down
12 changes: 6 additions & 6 deletions src/proxy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn start_metrics_server(
mut shutdown_rx: Receiver<()>,
log: Logger,
) {
info!(log, "starting metrics endpoint at {}", addr.to_string());
info!(log, "Starting metrics"; "address" => %addr);

let handler_log = log.clone();
let make_svc = make_service_fn(move |_conn| {
Expand Down Expand Up @@ -72,7 +72,7 @@ pub fn start_metrics_server(

tokio::spawn(async move {
if let Err(err) = server.await {
error!(log, "metrics server exited with an error: {}", err);
error!(log, "Metrics server exited with an error"; "error" => %err);
}
});
}
Expand All @@ -86,11 +86,11 @@ fn handle_request(log: Logger, method: &Method, path: &str, registry: Registry)
let encoder = TextEncoder::new();
let body = encoder
.encode(&registry.gather(), &mut buffer)
.map_err(|err| warn!(log, "failed to encode metrics: {:?}", err))
.map_err(|err| warn!(log, "Failed to encode metrics"; "error" => %err))
.and_then(|_| {
String::from_utf8(buffer)
.map(Body::from)
.map_err(|err| warn!(log, "failed to convert metrics to utf8: {:?}", err))
String::from_utf8(buffer).map(Body::from).map_err(
|err| warn!(log, "Failed to convert metrics to utf8"; "error" => %err),
)
});

match body {
Expand Down
24 changes: 11 additions & 13 deletions src/proxy/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,16 +281,15 @@ impl Server {
// We cannot recover from this error since
// it implies that the receiver has been dropped.
let reason =
"failed to send received packet over channel to worker".into();
"Failed to send received packet over channel to worker".into();
error!(log, "{}", reason);
return Err(reason);
}
}
err => {
// Socket error, we cannot recover from this so return an error instead.
let reason = format!("error processing receive socket: {:?}", err);
error!(log, "{}", reason);
return Err(reason);
error!(log, "Error processing receive socket"; "error" => #?err);
return Err(format!("error processing receive socket: {:?}", err));
}
}
}
Expand Down Expand Up @@ -320,13 +319,13 @@ impl Server {
match packet {
Some((recv_addr, packet)) => Self::process_downstream_received_packet((recv_addr, packet), &receive_config).await,
None => {
debug!(log, "worker-{} exiting: work sender channel was closed.", worker_id);
debug!(log, "Worker-{} exiting: work sender channel was closed.", worker_id);
return;
}
}
}
_ = shutdown_rx.changed() => {
debug!(log, "worker-{} exiting: received shutdown signal.", worker_id);
debug!(log, "Worker-{} exiting: received shutdown signal.", worker_id);
return;
}
}
Expand Down Expand Up @@ -449,19 +448,18 @@ impl Server {
} else {
warn!(
args.log,
"Could not find session for key: ({}:{})",
session_key.0.to_string(),
session_key.1.to_string()
"Could not find session";
"key" => format!("({}:{})", session_key.0.to_string(), session_key.1.to_string())
)
}
}
Err(err) => {
error!(args.log, "failed to ensure session exists"; "error" => %err);
error!(args.log, "Failed to ensure session exists"; "error" => %err);
}
}
}
Err(err) => {
error!(args.log, "failed to create session metrics"; "error" => %err);
error!(args.log, "Failed to create session metrics"; "error" => %err);
}
}
}
Expand Down Expand Up @@ -515,7 +513,7 @@ impl Server {

/// log_config outputs a log of what is configured
fn log_config(&self) {
info!(self.log, "Starting on port {}", self.config.proxy.port);
info!(self.log, "Starting"; "port" => self.config.proxy.port);
}

/// bind binds the local configured port
Expand All @@ -531,7 +529,7 @@ impl Server {
let now = if let Ok(now) = SystemTime::now().duration_since(UNIX_EPOCH) {
now.as_secs()
} else {
warn!(log, "failed to get current time when pruning sessions");
warn!(log, "Failed to get current time when pruning sessions");
return;
};

Expand Down
2 changes: 1 addition & 1 deletion src/proxy/server/resource_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl DynamicResourceManagers {
.await;
execution_result_tx
.send(result)
.map_err(|_err| warn!(log, "failed to send ADS client execution result on channel"))
.map_err(|_err| warn!(log, "Failed to send ADS client execution result on channel"))
.ok();
});

Expand Down
1 change: 0 additions & 1 deletion src/proxy/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ mod tests {
// Poll the state to wait for the change, because everything is async
for _ in 1..1000 {
let is_closed = sess.is_closed();
info!(t.log, "session closed?"; "closed" => is_closed);
if is_closed {
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub async fn run(filter_factories: Vec<Box<dyn FilterFactory>>) -> Result<(), Er
Ok(())
}
Err(err) => {
info!(log, "Shutting down with error: {}", err);
info!(log, "Shutting down with error"; "error" => %err);
Err(Error(format!("{:?}", err)))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Drop for TestHelper {
.map_err(|err| {
warn!(
log,
"failed to send server shutdown over channel: {:?}", err
"Failed to send server shutdown over channel"; "error" => %err
)
})
.ok();
Expand Down
29 changes: 13 additions & 16 deletions src/xds/ads_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,18 @@ impl AdsClient {
if err.to_string().to_lowercase().contains("invalid url") {
return Err(ExecutionError::Message(format!("{:?}", err)));
}
Self::log_error_and_backoff(
error!(log, "Unable to connect to the XDS server"; "address" => server_addr, "error" => %err);
Self::backoff(
&log,
format!("unable to connect to the XDS server at {}: {:?}", server_addr, err),
&mut backoff
).await?;
}
Err(RpcSessionError::Receive(handlers, bk_off, status)) => {
resource_handlers = handlers;
backoff = bk_off;
Self::log_error_and_backoff(
error!(log, "Failed to receive from XDS server"; "address" => server_addr, "status" => #?status);
Self::backoff(
&log,
format!("failed to receive from XDS server {}: {:?}", server_addr,status),
&mut backoff
).await?;
}
Expand Down Expand Up @@ -273,7 +273,7 @@ impl AdsClient {
Box::new(err))
)?;
} else {
info!(log, "exiting send loop");
info!(log, "Exiting send loop");
break;
}
}
Expand Down Expand Up @@ -383,7 +383,7 @@ impl AdsClient {
let response = match response {
Ok(None) => {
// No more messages on the connection.
info!(log, "exiting receive loop - response stream closed.");
info!(log, "Exiting receive loop - response stream closed.");
return Ok(resource_handlers)
},
Err(err) => return Err(RpcSessionError::Receive(resource_handlers, backoff, err)),
Expand All @@ -403,12 +403,12 @@ impl AdsClient {
resource_handlers.listener_manager.on_listener_response(response).await;
} else {
metrics.update_failure_total.inc();
error!(log, "Unexpected resource with type_url={:?}", response.type_url);
error!(log, "Unexpected resource"; "type" => response.type_url);
}
}

_ = shutdown_rx.changed() => {
info!(log, "exiting receive loop - received shutdown signal.");
info!(log, "Exiting receive loop - received shutdown signal");
return Ok(resource_handlers)
}
}
Expand All @@ -429,21 +429,19 @@ impl AdsClient {
}
metrics.requests_total.inc();

debug!(log, "sending rpc discovery request {:?}", req);
debug!(log, "Sending rpc discovery"; "request" => #?req);

req_tx.send(req).await
}

async fn log_error_and_backoff<C: Clock>(
async fn backoff<C: Clock>(
log: &Logger,
error_msg: String,
backoff: &mut ExponentialBackoff<C>,
) -> Result<(), ExecutionError> {
error!(log, "{}", error_msg);
let delay = backoff
.next_backoff()
.ok_or(ExecutionError::BackoffLimitExceeded)?;
info!(log, "retrying in {:?}", delay);
info!(log, "Retrying"; "delay" => #?delay);
tokio::time::sleep(delay).await;
Ok(())
}
Expand Down Expand Up @@ -476,9 +474,8 @@ pub(super) async fn send_discovery_req(
.map_err(|err| {
warn!(
log,
"Failed to send discovery request of type `{}`: {}",
type_url,
err.to_string()
"Failed to send discovery request";
"type" => %type_url, "error" => %err
)
})
// ok is safe here since an error would mean that we've dropped/closed the receiving
Expand Down
4 changes: 2 additions & 2 deletions src/xds/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl ClusterManager {
// the cluster has been deleted since ADS handles resource ordering.
warn!(
self.log,
"Got endpoint for non-existing cluster {}", assignment.cluster_name
"Got endpoint for non-existing cluster"; "name" => assignment.cluster_name
);
}
}
Expand All @@ -228,7 +228,7 @@ impl ClusterManager {
.send(self.clusters.clone())
.await
.map_err(|err| {
warn!(self.log, "failed to send cluster updates downstream");
warn!(self.log, "Failed to send cluster updates downstream");
err
})
// ok is safe here because an error can only be due to downstream dropping
Expand Down
2 changes: 1 addition & 1 deletion src/xds/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl ListenerManager {
.send(Arc::new(filter_chain))
.await
.map_err(|err| {
warn!(self.log, "failed to send filter chain update on channel");
warn!(self.log, "Failed to send filter chain update on channel");
err
})
// ok is safe here because an error can only be due to the consumer dropping
Expand Down
2 changes: 1 addition & 1 deletion tests/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ on_write: DECOMPRESS

// game_client
let local_addr: SocketAddr = format!("127.0.0.1:{}", client_port).parse().unwrap();
info!(t.log, "Sending hello"; "addr" => local_addr);
info!(t.log, "Sending hello"; "address" => local_addr);
tx.send_to(b"hello", &local_addr).await.unwrap();

let expected = timeout(Duration::from_secs(5), rx.recv())
Expand Down
4 changes: 2 additions & 2 deletions tests/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ mod tests {

// game_client
let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port);
info!(t.log, "Sending hello"; "addr" => local_addr);
info!(t.log, "Sending hello"; "address" => local_addr);
socket.send_to(b"hello", &local_addr).await.unwrap();

let result = recv_chan.recv().await.unwrap();
Expand Down Expand Up @@ -151,7 +151,7 @@ mod tests {

// game client
let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port);
info!(t.log, "Sending hello"; "addr" => local_addr);
info!(t.log, "Sending hello"; "address" => local_addr);
socket.send_to(b"hello", &local_addr).await.unwrap();

// since the debug filter doesn't change the data, it should be exactly the same
Expand Down
2 changes: 1 addition & 1 deletion tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ mod tests {

// game_client
let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port);
info!(t.log, "Sending hello"; "addr" => local_addr);
info!(t.log, "Sending hello"; "address" => local_addr);
socket.send_to(b"hello", &local_addr).await.unwrap();

let _ = recv_chan.recv().await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests/xds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ mod tests {
source_response = source_discovery_response_rx.recv() => {
match source_response {
None => {
info!(log, "stopping updates to client: source was dropped");
info!(log, "Stopping updates to client: source was dropped");
return;
},
Some(result) => {
Expand Down

0 comments on commit 35975f1

Please sign in to comment.