Skip to content

Commit

Permalink
Rework tracing spans for background tasks
Browse files Browse the repository at this point in the history
spawn_background_task receives a Span constructed by the caller.
This allows embedding contextual information for the task, which is
used to reduce repetition in logging macros for the workers.
  • Loading branch information
mzabaluev committed Jan 11, 2022
1 parent 914373c commit d1ae093
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 104 deletions.
8 changes: 4 additions & 4 deletions relayer/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::RwLock;

use crossbeam_channel::{unbounded, Receiver, Sender};
use itertools::Itertools;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, error_span, info, trace, warn};

use ibc::{
core::ics24_host::identifier::{ChainId, ChannelId, PortId},
Expand Down Expand Up @@ -166,7 +166,7 @@ fn spawn_batch_worker<Chain: ChainHandle>(
subscriptions: Arc<RwLock<Vec<(Chain, Subscription)>>>,
) -> TaskHandle {
spawn_background_task(
"supervisor_batch".to_string(),
error_span!("supervisor_batch"),
Some(Duration::from_millis(500)),
move || -> Result<Next, TaskError<Infallible>> {
if let Some((chain, batch)) = try_recv_multiple(&subscriptions.acquire_read()) {
Expand Down Expand Up @@ -194,7 +194,7 @@ pub fn spawn_cmd_worker<Chain: ChainHandle>(
cmd_rx: Receiver<SupervisorCmd>,
) -> TaskHandle {
spawn_background_task(
"supervisor_cmd".to_string(),
error_span!("supervisor_cmd"),
Some(Duration::from_millis(500)),
move || {
if let Ok(cmd) = cmd_rx.try_recv() {
Expand Down Expand Up @@ -237,7 +237,7 @@ pub fn spawn_rest_worker<Chain: ChainHandle>(
rest_rx: rest::Receiver,
) -> TaskHandle {
spawn_background_task(
"supervisor_rest".to_string(),
error_span!("supervisor_rest"),
Some(Duration::from_millis(500)),
move || -> Result<Next, TaskError<Infallible>> {
handle_rest_requests(
Expand Down
12 changes: 4 additions & 8 deletions relayer/src/util/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use core::time::Duration;
use crossbeam_channel::{bounded, Sender};
use std::sync::{Arc, RwLock};
use std::thread;
use tracing::{error, info, span, warn};
use tracing::{error, info, warn};

use crate::util::lock::LockExt;

Expand Down Expand Up @@ -86,23 +86,19 @@ pub enum Next {
[`TaskHandle`].
*/
pub fn spawn_background_task<E: Display>(
task_name: String,
span: tracing::Span,
interval_pause: Option<Duration>,
mut step_runner: impl FnMut() -> Result<Next, TaskError<E>> + Send + Sync + 'static,
) -> TaskHandle {
let span = span!(tracing::Level::ERROR, "task", name = %task_name);
let _entered = span.enter();

info!("spawning");
info!(parent: &span, "spawning");

let stopped = Arc::new(RwLock::new(false));
let write_stopped = stopped.clone();

let (shutdown_sender, receiver) = bounded(1);
let thread_span = span.clone();

let join_handle = thread::spawn(move || {
let _entered = thread_span.enter();
let _entered = span.enter();
loop {
match receiver.try_recv() {
Ok(()) => {
Expand Down
14 changes: 4 additions & 10 deletions relayer/src/worker/channel.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::time::Duration;
use crossbeam_channel::Receiver;
use tracing::debug;
use tracing::{debug, error_span};

use crate::channel::Channel as RelayChannel;
use crate::util::task::{spawn_background_task, Next, TaskError, TaskHandle};
Expand All @@ -20,7 +20,7 @@ pub fn spawn_channel_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
cmd_rx: Receiver<WorkerCmd>,
) -> TaskHandle {
spawn_background_task(
format!("ChannelWorker({})", channel.short_name()),
error_span!("ChannelWorker", channel = %channel.short_name()),
Some(Duration::from_millis(200)),
move || {
if let Ok(cmd) = cmd_rx.try_recv() {
Expand All @@ -29,10 +29,7 @@ pub fn spawn_channel_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
// there can be up to two event for this channel, e.g. init and try.
// process the last event, the one with highest "rank".
let last_event = batch.events.last();
debug!(
channel = %channel.short_name(),
"channel worker starts processing {:#?}", last_event
);
debug!("starts processing {:#?}", last_event);

if let Some(event) = last_event {
let mut handshake_channel = RelayChannel::restore_from_event(
Expand All @@ -54,10 +51,7 @@ pub fn spawn_channel_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
height: current_height,
new_block: _,
} => {
debug!(
channel = %channel.short_name(),
"Channel worker starts processing block event at {:#?}", current_height
);
debug!("starts processing block event at {:#?}", current_height);

let height = current_height
.decrement()
Expand Down
51 changes: 32 additions & 19 deletions relayer/src/worker/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::convert::Infallible;
use core::time::Duration;
use crossbeam_channel::Receiver;
use tracing::{debug, trace, warn};
use tracing::{debug, span, trace, warn};

use ibc::events::IbcEvent;

Expand All @@ -19,13 +19,19 @@ pub fn spawn_refresh_client<ChainA: ChainHandle, ChainB: ChainHandle>(
) -> Option<TaskHandle> {
if client.is_expired_or_frozen() {
warn!(
"skipping refresh client task on frozen client: {}",
client.id()
client = %client.id,
"skipping refresh client task on frozen client",
);
None
} else {
Some(spawn_background_task(
format!("RefreshClientWorker({})", client),
span!(
tracing::Level::ERROR,
"RefreshClientWorker",
client = %client.id,
src_chain = %client.src_chain.id(),
dst_chain = %client.dst_chain.id(),
),
Some(Duration::from_secs(1)),
move || {
let res = client.refresh().map_err(|e| {
Expand All @@ -52,40 +58,47 @@ pub fn detect_misbehavior_task<ChainA: ChainHandle, ChainB: ChainHandle>(
) -> Option<TaskHandle> {
if client.is_expired_or_frozen() {
warn!(
"skipping detect misbehavior task on frozen client: {}",
client.id()
client = %client.id(),
"skipping detect misbehavior task on frozen client",
);
return None;
}

{
debug!("[{}] doing first misbehavior check", client);
let _span = span!(
tracing::Level::DEBUG,
"DetectMisbehaviorFirstCheck",
client = %client.id,
src_chain = %client.src_chain.id(),
dst_chain = %client.dst_chain.id(),
)
.entered();
debug!("doing first check");
let misbehavior_result = client.detect_misbehaviour_and_submit_evidence(None);
debug!(
"[{}] detect misbehavior result: {:?}",
client, misbehavior_result
);
trace!("detect misbehavior result: {:?}", misbehavior_result);
}

let handle = spawn_background_task(
format!("DetectMisbehaviorWorker({})", client),
span!(
tracing::Level::ERROR,
"DetectMisbehaviorWorker",
client = %client.id,
src_chain = %client.src_chain.id(),
dst_chain = %client.dst_chain.id(),
),
Some(Duration::from_millis(600)),
move || -> Result<Next, TaskError<Infallible>> {
if let Ok(cmd) = receiver.try_recv() {
match cmd {
WorkerCmd::IbcEvents { batch } => {
trace!("[{}] worker received batch: {:?}", client, batch);
trace!("received batch: {:?}", batch);

for event in batch.events {
if let IbcEvent::UpdateClient(update) = event {
debug!("[{}] checking misbehavior for updated client", client);
debug!("checking misbehavior for updated client");
let misbehavior_result =
client.detect_misbehaviour_and_submit_evidence(Some(update));
trace!(
"[{}] detect misbehavior result: {:?}",
client,
misbehavior_result
);
trace!("detect misbehavior result: {:?}", misbehavior_result);

match misbehavior_result {
MisbehaviourResults::ValidClient => {}
Expand Down
15 changes: 4 additions & 11 deletions relayer/src/worker/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::time::Duration;
use crossbeam_channel::Receiver;
use tracing::debug;
use tracing::{debug, error_span};

use crate::connection::Connection as RelayConnection;
use crate::util::task::{spawn_background_task, Next, TaskError, TaskHandle};
Expand All @@ -20,7 +20,7 @@ pub fn spawn_connection_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
cmd_rx: Receiver<WorkerCmd>,
) -> TaskHandle {
spawn_background_task(
format!("ConnectionWorker({})", connection.short_name()),
error_span!("ConnectionWorker", connection = %connection.short_name()),
Some(Duration::from_millis(200)),
move || {
if let Ok(cmd) = cmd_rx.try_recv() {
Expand All @@ -30,10 +30,7 @@ pub fn spawn_connection_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
// process the last event, the one with highest "rank".
let last_event = batch.events.last();

debug!(
connection = %connection.short_name(),
"connection worker starts processing {:#?}", last_event
);
debug!("starts processing {:#?}", last_event);

if let Some(event) = last_event {
let mut handshake_connection = RelayConnection::restore_from_event(
Expand All @@ -56,11 +53,7 @@ pub fn spawn_connection_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
height: current_height,
new_block: _,
} => {
debug!(
connection = %connection.short_name(),
"connection worker starts processing block event at {}",
current_height
);
debug!("starts processing block event at {}", current_height);

let height = current_height
.decrement()
Expand Down
Loading

0 comments on commit d1ae093

Please sign in to comment.