Skip to content

Commit

Permalink
fix: correct panic in tracing for comms (#3499)
Browse files Browse the repository at this point in the history
Description
---
Fixes a panic for tracing,
Adds additional comments for viewing the Jaeger container after running the image in docker.

Motivation and Context
---
Without this PR, the following error is encountered when using the `--tracing-enabled` flag
```
thread 'tokio-runtime-worker' panicked at 'Span to follow not found, this is a bug', .cargo\registry\src\github.com-1ecc6299db9ec823\tracing-opentelemetry-0.15.0\src\layer.rs:484:14
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'tokio-runtime-worker' panicked at 'Mutex poisoned: PoisonError { .. }', .cargo\registry\src\github.com-1ecc6299db9ec823\tracing-subscriber-0.2.20\src\registry\sharded.rs:400:58
```

How Has This Been Tested?
---
Manually
  • Loading branch information
StriderDM authored Oct 27, 2021
1 parent 2fde54d commit af15fcc
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 64 deletions.
2 changes: 2 additions & 0 deletions applications/tari_base_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ async fn run_node(node_config: Arc<GlobalConfig>, bootstrap: ConfigBootstrap) ->
fn enable_tracing() {
// To run:
// docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 jaegertracing/all-in-one:latest
// To view the UI after starting the container (default):
// http://localhost:16686
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("tari::base_node")
Expand Down
41 changes: 22 additions & 19 deletions applications/tari_console_wallet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn main() {
eprintln!("{:?}", exit_code);
error!(
target: LOG_TARGET,
"Exiting with code ({}): {:?}",
"Exiting with code ({:?}): {:?}",
exit_code.as_i32(),
exit_code
);
Expand All @@ -65,6 +65,10 @@ fn main_inner() -> Result<(), ExitCodes> {

let (bootstrap, global_config, _) = init_configuration(ApplicationType::ConsoleWallet)?;

if bootstrap.tracing_enabled {
enable_tracing();
}

info!(
target: LOG_TARGET,
"== {} ({}) ==",
Expand All @@ -91,7 +95,6 @@ fn main_inner() -> Result<(), ExitCodes> {
info!(target: LOG_TARGET, "Default configuration created. Done.");
}

enable_tracing_if_specified(&bootstrap);
// get command line password if provided
let arg_password = bootstrap.password.clone();
let seed_words_file_name = bootstrap.seed_words_file_name.clone();
Expand Down Expand Up @@ -185,21 +188,21 @@ fn get_recovery_master_key(
}
}

fn enable_tracing_if_specified(bootstrap: &ConfigBootstrap) {
if bootstrap.tracing_enabled {
// To run: docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 \
// jaegertracing/all-in-one:latest
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("tari::console_wallet")
.with_tags(vec![KeyValue::new("pid", process::id().to_string()), KeyValue::new("current_exe", env::current_exe().unwrap().to_str().unwrap_or_default().to_owned())])
// TODO: uncomment when using tokio 1
// .install_batch(opentelemetry::runtime::Tokio)
.install_simple()
.unwrap();
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = Registry::default().with(telemetry);
tracing::subscriber::set_global_default(subscriber)
.expect("Tracing could not be set. Try running without `--tracing-enabled`");
}
fn enable_tracing() {
// To run:
// docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 jaegertracing/all-in-one:latest
// To view the UI after starting the container (default):
// http://localhost:16686
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("tari::console_wallet")
.with_tags(vec![KeyValue::new("pid", process::id().to_string()), KeyValue::new("current_exe", env::current_exe().unwrap().to_str().unwrap_or_default().to_owned())])
// TODO: uncomment when using tokio 1
// .install_batch(opentelemetry::runtime::Tokio)
.install_simple()
.unwrap();
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = Registry::default().with(telemetry);
tracing::subscriber::set_global_default(subscriber)
.expect("Tracing could not be set. Try running without `--tracing-enabled`");
}
10 changes: 3 additions & 7 deletions comms/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,10 @@ where
use ConnectionManagerRequest::*;
trace!(target: LOG_TARGET, "Connection manager got request: {:?}", request);
match request {
DialPeer {
node_id,
reply_tx,
tracing_id: _tracing,
} => {
DialPeer { node_id, reply_tx } => {
let tracing_id = tracing::Span::current().id();
let span = span!(Level::TRACE, "connection_manager::handle_request");
// This causes a panic for some reason?
// span.follows_from(tracing_id);
span.follows_from(tracing_id);
self.dial_peer(node_id, reply_tx).instrument(span).await
},
CancelDial(node_id) => {
Expand Down
11 changes: 3 additions & 8 deletions comms/src/connection_manager/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use tokio::{
time,
};
use tokio_stream::StreamExt;
use tracing::{self, span, Instrument, Level, Span};
use tracing::{self, span, Instrument, Level};

const LOG_TARGET: &str = "comms::connection_manager::peer_connection";

Expand Down Expand Up @@ -116,7 +116,6 @@ pub enum PeerConnectionRequest {
OpenSubstream {
protocol_id: ProtocolId,
reply_tx: oneshot::Sender<Result<NegotiatedSubstream<Substream>, PeerConnectionError>>,
tracing_id: Option<tracing::span::Id>,
},
/// Disconnect all substreams and close the transport connection
Disconnect(bool, oneshot::Sender<Result<(), PeerConnectionError>>),
Expand Down Expand Up @@ -207,7 +206,6 @@ impl PeerConnection {
.send(PeerConnectionRequest::OpenSubstream {
protocol_id: protocol_id.clone(),
reply_tx,
tracing_id: Span::current().id(),
})
.await?;
reply_rx
Expand Down Expand Up @@ -394,11 +392,8 @@ impl PeerConnectionActor {
async fn handle_request(&mut self, request: PeerConnectionRequest) {
use PeerConnectionRequest::*;
match request {
OpenSubstream {
protocol_id,
reply_tx,
tracing_id,
} => {
OpenSubstream { protocol_id, reply_tx } => {
let tracing_id = tracing::Span::current().id();
let span = span!(Level::TRACE, "handle_request");
span.follows_from(tracing_id);
let result = self.open_negotiated_protocol_stream(protocol_id).instrument(span).await;
Expand Down
7 changes: 1 addition & 6 deletions comms/src/connection_manager/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub enum ConnectionManagerRequest {
DialPeer {
node_id: NodeId,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
tracing_id: Option<tracing::span::Id>,
},
/// Cancels a pending dial if one exists
CancelDial(NodeId),
Expand Down Expand Up @@ -100,11 +99,7 @@ impl ConnectionManagerRequester {
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
) -> Result<(), ConnectionManagerError> {
self.sender
.send(ConnectionManagerRequest::DialPeer {
node_id,
reply_tx,
tracing_id: tracing::Span::current().id(),
})
.send(ConnectionManagerRequest::DialPeer { node_id, reply_tx })
.await
.map_err(|_| ConnectionManagerError::SendToActorFailed)?;
Ok(())
Expand Down
8 changes: 2 additions & 6 deletions comms/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,9 @@ impl ConnectivityManagerActor {
GetConnectivityStatus(reply) => {
let _ = reply.send(self.status);
},
DialPeer {
node_id,
reply_tx,
tracing_id,
} => {
DialPeer { node_id, reply_tx } => {
let tracing_id = tracing::Span::current().id();
let span = span!(Level::TRACE, "handle_request");
// let _e = span.enter();
span.follows_from(tracing_id);
async move {
match self.pool.get(&node_id) {
Expand Down
3 changes: 0 additions & 3 deletions comms/src/connectivity/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ pub enum ConnectivityRequest {
DialPeer {
node_id: NodeId,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
tracing_id: Option<tracing::span::Id>,
},
GetConnectivityStatus(oneshot::Sender<ConnectivityStatus>),
SelectConnections(
Expand Down Expand Up @@ -131,7 +130,6 @@ impl ConnectivityRequester {
.send(ConnectivityRequest::DialPeer {
node_id: peer.clone(),
reply_tx: Some(reply_tx),
tracing_id: tracing::Span::current().id(),
})
.await
.map_err(|_| ConnectivityError::ActorDisconnected)?;
Expand Down Expand Up @@ -171,7 +169,6 @@ impl ConnectivityRequester {
self.sender.send(ConnectivityRequest::DialPeer {
node_id: peer,
reply_tx: None,
tracing_id: tracing::Span::current().id(),
})
}))
.await
Expand Down
6 changes: 1 addition & 5 deletions comms/src/test_utils/mocks/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,7 @@ impl ConnectionManagerMock {
self.state.inc_call_count();
self.state.add_call(format!("{:?}", req)).await;
match req {
DialPeer {
node_id,
mut reply_tx,
tracing_id: _,
} => {
DialPeer { node_id, mut reply_tx } => {
// Send Ok(conn) if we have an active connection, otherwise Err(DialConnectFailedAllAddresses)
let result = self
.state
Expand Down
6 changes: 1 addition & 5 deletions comms/src/test_utils/mocks/connectivity_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,7 @@ impl ConnectivityManagerMock {
use ConnectivityRequest::*;
self.state.add_call(format!("{:?}", req)).await;
match req {
DialPeer {
node_id,
reply_tx,
tracing_id: _,
} => {
DialPeer { node_id, reply_tx } => {
self.state.add_dialed_peer(node_id.clone()).await;
// No reply, no reason to do anything in the mock
if reply_tx.is_none() {
Expand Down
6 changes: 1 addition & 5 deletions comms/src/test_utils/mocks/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,7 @@ impl PeerConnectionMock {
use PeerConnectionRequest::*;
self.state.inc_call_count();
match req {
OpenSubstream {
protocol_id,
reply_tx,
tracing_id: _,
} => match self.state.open_substream().await {
OpenSubstream { protocol_id, reply_tx } => match self.state.open_substream().await {
Ok(stream) => {
let negotiated_substream = NegotiatedSubstream {
protocol: protocol_id,
Expand Down

0 comments on commit af15fcc

Please sign in to comment.