From ee77e3862610a9fbef366f910ad96d379984eff7 Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Wed, 14 Feb 2024 18:20:48 +0000 Subject: [PATCH] feat(fmt_style): enabling fmt style selection Signed-off-by: Abhilash Shetty --- Cargo.lock | 15 + control-plane/agents/src/bin/core/main.rs | 24 +- .../agents/src/bin/ha/cluster/main.rs | 28 +- control-plane/agents/src/bin/ha/node/main.rs | 23 +- .../csi-driver/src/bin/controller/main.rs | 33 ++- .../csi-driver/src/bin/node/main_.rs | 33 ++- control-plane/plugin/src/lib.rs | 20 +- control-plane/rest/service/src/main.rs | 23 +- k8s/operators/src/pool/main.rs | 28 +- utils/dependencies | 2 +- utils/utils-lib/Cargo.toml | 4 +- utils/utils-lib/src/tracing_telemetry.rs | 260 ++++++++---------- 12 files changed, 297 insertions(+), 196 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 38eb787b6..b42f9579f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4305,6 +4305,16 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.17" @@ -4315,12 +4325,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] @@ -4472,6 +4485,8 @@ dependencies = [ "git-version-macro", "opentelemetry", "opentelemetry-jaeger", + "strum", + "strum_macros", "tracing", "tracing-filter", "tracing-opentelemetry", diff --git a/control-plane/agents/src/bin/core/main.rs b/control-plane/agents/src/bin/core/main.rs index 51a71208c..e2ba30e04 100644 --- a/control-plane/agents/src/bin/core/main.rs +++ b/control-plane/agents/src/bin/core/main.rs @@ -23,7 +23,7 @@ use std::{net::SocketAddr, num::ParseIntError}; use utils::{version_info_str, DEFAULT_GRPC_SERVER_ADDR, ETCD_MAX_PAGE_LIMIT}; use stor_port::HostAccessControl; -use utils::tracing_telemetry::{trace::TracerProvider, KeyValue}; +use utils::tracing_telemetry::{trace::TracerProvider, FmtLayer, FmtStyle, KeyValue}; /// The Cli arguments for this binary. #[derive(Debug, Parser)] @@ -114,6 +114,14 @@ pub(crate) struct CliArgs { /// Etcd Pagination Limit. #[clap(long, default_value = ETCD_MAX_PAGE_LIMIT)] pub(crate) etcd_page_limit: u32, + + /// Formatting style to be used while logging. + #[clap(default_value = FmtStyle::Pretty.as_ref(), short, long)] + fmt_style: FmtStyle, + + /// Use ANSI colors for the logs. + #[clap(long)] + ansi_colors: bool, } impl CliArgs { fn args() -> Self { @@ -156,12 +164,14 @@ async fn main() -> anyhow::Result<()> { let cli_args = CliArgs::args(); utils::print_package_info!(); println!("Using options: {cli_args:?}"); - utils::tracing_telemetry::init_tracing_with_eventing( - "agent-core", - cli_args.tracing_tags.clone(), - cli_args.jaeger.clone(), - cli_args.events_url.clone(), - ); + utils::tracing_telemetry::TracingTelemetry::builder() + .with_writer(FmtLayer::Stdout) + .with_style(cli_args.fmt_style) + .with_colours(cli_args.ansi_colors) + .with_jaeger(cli_args.jaeger.clone()) + .with_events_url(cli_args.events_url.clone()) + .with_tracing_tags(cli_args.tracing_tags.clone()) + .init("agent-core"); server(cli_args).await } diff --git a/control-plane/agents/src/bin/ha/cluster/main.rs b/control-plane/agents/src/bin/ha/cluster/main.rs index 8c5590559..ca85bdee9 100644 --- a/control-plane/agents/src/bin/ha/cluster/main.rs +++ b/control-plane/agents/src/bin/ha/cluster/main.rs @@ -5,9 +5,11 @@ use once_cell::sync::OnceCell; use std::net::SocketAddr; use tracing::info; use utils::{ - package_description, tracing_telemetry::KeyValue, version_info_str, - DEFAULT_CLUSTER_AGENT_SERVER_ADDR, DEFAULT_GRPC_CLIENT_ADDR, + package_description, + tracing_telemetry::{FmtLayer, FmtStyle, KeyValue}, + version_info_str, DEFAULT_CLUSTER_AGENT_SERVER_ADDR, DEFAULT_GRPC_CLIENT_ADDR, }; + mod etcd; mod nodes; mod server; @@ -48,6 +50,14 @@ struct Cli { /// Events message-bus endpoint url. #[clap(long, short)] events_url: Option, + + /// Formatting style to be used while logging. + #[clap(default_value = FmtStyle::Pretty.as_ref(), short, long)] + fmt_style: FmtStyle, + + /// If set, configures the output to be in ansi color format. + #[clap(long)] + ansi_colors: bool, } impl Cli { @@ -67,12 +77,14 @@ pub(crate) fn core_grpc<'a>() -> &'a CoreClient { } fn initialize_tracing(args: &Cli) { - utils::tracing_telemetry::init_tracing_with_eventing( - "agent-ha-cluster", - args.tracing_tags.clone(), - args.jaeger.clone(), - args.events_url.clone(), - ) + utils::tracing_telemetry::TracingTelemetry::builder() + .with_writer(FmtLayer::Stdout) + .with_style(args.fmt_style) + .with_colours(args.ansi_colors) + .with_jaeger(args.jaeger.clone()) + .with_events_url(args.events_url.clone()) + .with_tracing_tags(args.tracing_tags.clone()) + .init("agent-ha-cluster"); } #[tokio::main] diff --git a/control-plane/agents/src/bin/ha/node/main.rs b/control-plane/agents/src/bin/ha/node/main.rs index c70c1c46d..d14a427e9 100644 --- a/control-plane/agents/src/bin/ha/node/main.rs +++ b/control-plane/agents/src/bin/ha/node/main.rs @@ -25,6 +25,7 @@ mod server; use detector::PathFailureDetector; use server::NodeAgentApiServer; +use utils::tracing_telemetry::{FmtLayer, FmtStyle}; /// TODO #[derive(Debug, Parser)] @@ -77,6 +78,14 @@ struct Cli { /// Events message-bus endpoint url. #[clap(long, short)] events_url: Option, + + /// Formatting style to be used while logging. + #[clap(default_value = FmtStyle::Pretty.as_ref(), short, long)] + fmt_style: FmtStyle, + + /// Enable ansi colors for logs. + #[clap(long)] + ansi_colors: bool, } static CLUSTER_AGENT_CLIENT: OnceCell = OnceCell::new(); @@ -107,12 +116,14 @@ async fn main() -> anyhow::Result<()> { utils::print_package_info!(); - utils::tracing_telemetry::init_tracing_with_eventing( - "agent-ha-node", - cli_args.tracing_tags.clone(), - cli_args.jaeger.clone(), - cli_args.events_url.clone(), - ); + utils::tracing_telemetry::TracingTelemetry::builder() + .with_writer(FmtLayer::Stdout) + .with_style(cli_args.fmt_style) + .with_colours(cli_args.ansi_colors) + .with_jaeger(cli_args.jaeger.clone()) + .with_events_url(cli_args.events_url.clone()) + .with_tracing_tags(cli_args.tracing_tags.clone()) + .init("agent-ha-node"); CLUSTER_AGENT_CLIENT .set(ClusterAgentClient::new(cli_args.cluster_agent.clone(), None).await) diff --git a/control-plane/csi-driver/src/bin/controller/main.rs b/control-plane/csi-driver/src/bin/controller/main.rs index 8e79d280b..d06e07cb0 100644 --- a/control-plane/csi-driver/src/bin/controller/main.rs +++ b/control-plane/csi-driver/src/bin/controller/main.rs @@ -1,6 +1,3 @@ -use client::{ApiClientError, RestApiClient}; -use config::CsiControllerConfig; - mod client; mod config; mod controller; @@ -9,7 +6,10 @@ mod pvwatcher; mod server; use clap::{Arg, ArgMatches}; +use client::{ApiClientError, RestApiClient}; +use config::CsiControllerConfig; use tracing::info; +use utils::tracing_telemetry::{FmtLayer, FmtStyle}; const CSI_SOCKET: &str = "/var/tmp/csi.sock"; const CONCURRENCY_LIMIT: usize = 10; @@ -103,6 +103,18 @@ async fn main() -> anyhow::Result<()> { An orphan volume is a volume with no corresponding PV", ) ) + .arg( + Arg::new("fmt-style") + .long("fmt-style") + .default_value(FmtStyle::Pretty.as_ref()) + .help("Formatting style to be used while logging") + ) + .arg( + Arg::new("ansi-colors") + .long("ansi-colors") + .action(clap::ArgAction::SetTrue) + .help("Enable ansi color for logs") + ) .get_matches(); utils::print_package_info!(); @@ -111,11 +123,16 @@ async fn main() -> anyhow::Result<()> { utils::raw_version_str(), env!("CARGO_PKG_VERSION"), ); - utils::tracing_telemetry::init_tracing( - "csi-controller", - tags, - args.get_one::("jaeger").cloned(), - ); + let fmt_style = args.get_one::("fmt-style").unwrap(); + let ansi_colors = args.get_flag("ansi-colors"); + utils::tracing_telemetry::TracingTelemetry::builder() + .with_writer(FmtLayer::Stdout) + .with_style(*fmt_style) + .with_colours(ansi_colors) + .with_jaeger(args.get_one::("jaeger").cloned()) + .with_tracing_tags(tags) + .init("csi-controller"); + let orphan_period = args .get_one::("orphan-vol-gc-period") .map(|p| p.parse::()) diff --git a/control-plane/csi-driver/src/bin/node/main_.rs b/control-plane/csi-driver/src/bin/node/main_.rs index eea1ce006..373739e49 100644 --- a/control-plane/csi-driver/src/bin/node/main_.rs +++ b/control-plane/csi-driver/src/bin/node/main_.rs @@ -186,7 +186,20 @@ pub(super) async fn main() -> anyhow::Result<()> { .help( "The node selector label which this plugin will report as part of its topology.\n\ Example:\n --node-selector key=value --node-selector key2=value2", - ), + ) + , + ) + .arg( + Arg::new("fmt-style") + .long("fmt-style") + .default_value(FmtStyle::Pretty.as_ref()) + .help("Formatting style to be used while logging") + ) + .arg( + Arg::new("ansi-colors") + .long("ansi-colors") + .action(clap::ArgAction::SetTrue) + .help("Enable ANSI color for logs") ) .subcommand( clap::Command::new("fs-freeze") @@ -211,6 +224,10 @@ pub(super) async fn main() -> anyhow::Result<()> { ) ) .get_matches(); + let tags = utils::tracing_telemetry::default_tracing_tags( + utils::raw_version_str(), + env!("CARGO_PKG_VERSION"), + ); // Handle fs-freeze and fs-unfreeze commands. if let Some(cmd) = matches.subcommand() { @@ -218,7 +235,8 @@ pub(super) async fn main() -> anyhow::Result<()> { .with_writer(FmtLayer::Stderr) .with_style(FmtStyle::Compact) .with_colours(false) - .init(); + .with_tracing_tags(tags) + .init("csi-node"); match cmd { ("fs-freeze", arg_matches) => { let volume_id = arg_matches.get_one::("volume-id").unwrap(); @@ -242,7 +260,16 @@ pub(super) async fn main() -> anyhow::Result<()> { utils::raw_version_str(), env!("CARGO_PKG_VERSION"), ); - utils::tracing_telemetry::init_tracing("csi-node", tags, None); + + let fmt_style = matches.get_one::("fmt-style").unwrap(); + + let colors = matches.get_flag("ansi-colors"); + utils::tracing_telemetry::TracingTelemetry::builder() + .with_writer(FmtLayer::Stdout) + .with_style(*fmt_style) + .with_colours(colors) + .with_tracing_tags(tags.clone()) + .init("csi-node"); // Validate presence of nvme_tcp kernel module and set nvme_core parameters. if let Err(error) = crate::dev::nvmf::check_nvme_tcp_module() { diff --git a/control-plane/plugin/src/lib.rs b/control-plane/plugin/src/lib.rs index b992300f5..7c2b304ad 100644 --- a/control-plane/plugin/src/lib.rs +++ b/control-plane/plugin/src/lib.rs @@ -5,6 +5,8 @@ extern crate lazy_static; use operations::Label; use resources::LabelResources; +use std::fmt::Debug; +use utils::tracing_telemetry::{FmtLayer, FmtStyle}; use crate::{ operations::{ @@ -60,17 +62,17 @@ impl CliArgs { utils::tracing_telemetry::default_tracing_tags(git_version, env!("CARGO_PKG_VERSION")); let fmt_layer = match std::env::var("RUST_LOG") { - Ok(_) => utils::tracing_telemetry::FmtLayer::Stderr, - Err(_) => utils::tracing_telemetry::FmtLayer::None, + Ok(_) => FmtLayer::Stderr, + Err(_) => FmtLayer::None, }; - utils::tracing_telemetry::init_tracing_ext( - env!("CARGO_PKG_NAME"), - tags, - self.jaeger.as_ref(), - fmt_layer, - None, - ); + utils::tracing_telemetry::TracingTelemetry::builder() + .with_writer(fmt_layer) + .with_style(FmtStyle::Pretty) + .with_colours(false) + .with_jaeger(self.jaeger.clone()) + .with_tracing_tags(tags) + .init(env!("CARGO_PKG_NAME")); TracingFlusher {} } diff --git a/control-plane/rest/service/src/main.rs b/control-plane/rest/service/src/main.rs index 2793b7868..4c77fc028 100644 --- a/control-plane/rest/service/src/main.rs +++ b/control-plane/rest/service/src/main.rs @@ -76,6 +76,14 @@ pub(crate) struct CliArgs { /// The value 0 means the number of available physical CPUs is used. #[clap(long, short, default_value = utils::DEFAULT_REST_MAX_WORKER_THREADS)] max_workers: usize, + + /// Formatting style to be used while logging. + #[clap(default_value = FmtStyle::Pretty.as_ref(), short, long)] + fmt_style: FmtStyle, + + /// Use ANSI colors for logs. + #[clap(long)] + ansi_colors: bool, } impl CliArgs { fn args() -> Self { @@ -105,7 +113,7 @@ use clap::Parser; use grpc::{client::CoreClient, operations::jsongrpc::client::JsonGrpcClient}; use http::Uri; use stor_port::transport_api::{RequestMinTimeout, TimeoutOptions}; -use utils::tracing_telemetry::KeyValue; +use utils::tracing_telemetry::{FmtLayer, FmtStyle, KeyValue}; /// Extension trait for actix-web applications. pub trait OpenApiExt { @@ -205,11 +213,14 @@ async fn main() -> anyhow::Result<()> { utils::print_package_info!(); let cli_args = CliArgs::args(); println!("Using options: {:?}", &cli_args); - utils::tracing_telemetry::init_tracing( - "rest-server", - cli_args.tracing_tags.clone(), - cli_args.jaeger.clone(), - ); + + utils::tracing_telemetry::TracingTelemetry::builder() + .with_writer(FmtLayer::Stdout) + .with_style(cli_args.fmt_style) + .with_colours(cli_args.ansi_colors) + .with_jaeger(cli_args.jaeger.clone()) + .with_tracing_tags(cli_args.tracing_tags.clone()) + .init("rest-server"); let app = move || { App::new() diff --git a/k8s/operators/src/pool/main.rs b/k8s/operators/src/pool/main.rs index f349b9eb0..baa4d0c56 100644 --- a/k8s/operators/src/pool/main.rs +++ b/k8s/operators/src/pool/main.rs @@ -32,6 +32,7 @@ use mayastorpool::client::{check_crd, delete, list}; use openapi::clients::{self, tower::Url}; use std::{collections::HashMap, sync::Arc, time::Duration}; use tracing::{error, info, trace, warn}; +use utils::tracing_telemetry::{FmtLayer, FmtStyle}; const PAGINATION_LIMIT: u32 = 100; const BACKOFF_PERIOD: u64 = 20; @@ -235,6 +236,18 @@ async fn main() -> anyhow::Result<()> { .action(clap::ArgAction::SetTrue) .help("do not attempt to validate the block device prior to pool creation"), ) + .arg( + Arg::new("fmt-style") + .long("fmt-style") + .default_value(FmtStyle::Pretty.as_ref()) + .help("Formatting style to be used while logging"), + ) + .arg( + Arg::new("ansi-colors") + .long("ansi-colors") + .action(clap::ArgAction::SetTrue) + .help("Enable ansi color for logs"), + ) .get_matches(); utils::print_package_info!(); @@ -243,11 +256,16 @@ async fn main() -> anyhow::Result<()> { utils::raw_version_str(), env!("CARGO_PKG_VERSION"), ); - utils::tracing_telemetry::init_tracing( - "dsp-operator", - tags, - matches.get_one::("jaeger").cloned(), - ); + + let fmt_style = matches.get_one::("fmt-style").unwrap(); + let ansi_colors = matches.get_flag("ansi-colors"); + utils::tracing_telemetry::TracingTelemetry::builder() + .with_writer(FmtLayer::Stdout) + .with_style(*fmt_style) + .with_colours(ansi_colors) + .with_jaeger(matches.get_one::("jaeger").cloned()) + .with_tracing_tags(tags) + .init("agent-ha-node"); pool_controller(matches).await?; utils::tracing_telemetry::flush_traces(); diff --git a/utils/dependencies b/utils/dependencies index e95cf1bed..d96035c16 160000 --- a/utils/dependencies +++ b/utils/dependencies @@ -1 +1 @@ -Subproject commit e95cf1bed516570e78b4d5e1cf9ae546d79286ba +Subproject commit d96035c16bc92d121db0fd1c06ab921fb3531847 diff --git a/utils/utils-lib/Cargo.toml b/utils/utils-lib/Cargo.toml index 6b7f9b558..f69d18baf 100644 --- a/utils/utils-lib/Cargo.toml +++ b/utils/utils-lib/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -tracing-subscriber = { version = "0.3.17", features = [ "env-filter" ] } +tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json"] } tracing-opentelemetry = "0.21.0" opentelemetry = { version = "0.20.0", features = ["rt-tokio-current-thread"] } opentelemetry-jaeger = { version = "0.19.0", features = ["rt-tokio-current-thread"] } @@ -14,3 +14,5 @@ tracing = "0.1.37" url = "2.4.1" event-publisher = { path = "../dependencies/event-publisher" } tracing-filter = { path = "../dependencies/tracing-filter" } +strum = "0.25.0" +strum_macros = "0.25.2" diff --git a/utils/utils-lib/src/tracing_telemetry.rs b/utils/utils-lib/src/tracing_telemetry.rs index c9d0b5978..ee63227b5 100644 --- a/utils/utils-lib/src/tracing_telemetry.rs +++ b/utils/utils-lib/src/tracing_telemetry.rs @@ -2,12 +2,9 @@ pub use opentelemetry::{global, trace, Context, KeyValue}; use event_publisher::event_handler::EventHandle; -use opentelemetry::sdk::{propagation::TraceContextPropagator, Resource}; +use opentelemetry::sdk::{propagation::TraceContextPropagator, trace::Tracer, Resource}; use tracing::Level; -use tracing_subscriber::{ - filter, fmt::writer::MakeWriterExt, layer::SubscriberExt, util::SubscriberInitExt, Layer, - Registry, -}; +use tracing_subscriber::{filter, layer::SubscriberExt, util::SubscriberInitExt, Layer, Registry}; /// Parse KeyValues from structopt's cmdline arguments pub fn parse_key_value(source: &str) -> Result { @@ -43,29 +40,6 @@ pub fn set_jaeger_env() { } } -/// Initialise tracing and optionally opentelemetry and eventing. -/// Tracing will have a stdout subscriber with pretty formatting. -pub fn init_tracing_with_eventing( - service_name: &str, - tracing_tags: Vec, - jaeger: Option, - events_url: Option, -) { - init_tracing_ext( - service_name, - tracing_tags, - jaeger, - FmtLayer::Stdout, - events_url, - ); -} - -/// Initialise tracing and optionally opentelemetry. -/// Tracing will have a stdout subscriber with pretty formatting. -pub fn init_tracing(service_name: &str, tracing_tags: Vec, jaeger: Option) { - init_tracing_ext(service_name, tracing_tags, jaeger, FmtLayer::Stdout, None); -} - /// Fmt Layer for console output. pub enum FmtLayer { /// Output traces to stdout. @@ -76,98 +50,28 @@ pub enum FmtLayer { None, } -/// Initialise tracing and optionally opentelemetry. -/// Tracing will have a stdout subscriber with pretty formatting. -pub fn init_tracing_ext( - service_name: &str, - mut tracing_tags: Vec, - jaeger: Option, - fmt_layer: FmtLayer, - events_url: Option, -) { - const EVENT_BUS: &str = "mbus-events-target"; - let (stdout, stderr) = match fmt_layer { - FmtLayer::Stdout => ( - Some( - tracing_subscriber::fmt::layer() - .with_writer(std::io::stdout.with_filter(|meta| meta.target() != EVENT_BUS)) - .pretty(), - ), - None, - ), - FmtLayer::Stderr => ( - None, - Some( - tracing_subscriber::fmt::layer() - .with_writer(std::io::stderr.with_filter(|meta| meta.target() != EVENT_BUS)) - .pretty(), - ), - ), - FmtLayer::None => (None, None), - }; - - // Get the optional eventing layer. - let events_layer = match events_url { - Some(url) => { - let target = filter::Targets::new().with_target(EVENT_BUS, Level::INFO); - Some(EventHandle::init(url.to_string(), service_name).with_filter(target)) - } - None => None, - }; - - let subscriber = Registry::default() - .with(tracing_filter::rust_log_filter()) - .with(stdout) - .with(stderr) - .with(events_layer); - - match jaeger { - Some(jaeger) => { - tracing_tags.append(&mut default_tracing_tags( - super::raw_version_str(), - env!("CARGO_PKG_VERSION"), - )); - let tracing_tags = - tracing_tags - .into_iter() - .fold(Vec::::new(), |mut acc, kv| { - if !acc.iter().any(|acc| acc.key == kv.key) { - acc.push(kv); - } - acc - }); - set_jaeger_env(); - - global::set_text_map_propagator(TraceContextPropagator::new()); - let tracer = opentelemetry_jaeger::new_agent_pipeline() - .with_endpoint(jaeger) - .with_service_name(service_name) - .with_trace_config( - opentelemetry::sdk::trace::Config::default() - .with_resource(Resource::new(tracing_tags)), - ) - .install_batch(opentelemetry::runtime::TokioCurrentThread) - .expect("Should be able to initialise the exporter"); - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - subscriber.with(telemetry).init(); - } - None => subscriber.init(), - }; -} - /// Tracing telemetry style. +#[derive(Debug, Clone, Copy, strum_macros::EnumString, strum_macros::AsRefStr)] +#[strum(serialize_all = "lowercase")] pub enum FmtStyle { /// Compact style. Compact, - /// Pretty Style + /// Pretty Style. Pretty, + /// JSON Style. + Json, } +const EVENT_BUS: &str = "mbus-events-target"; + /// Tracing telemetry builder. pub struct TracingTelemetry { writer: FmtLayer, style: FmtStyle, colours: bool, + jaeger: Option, + events_url: Option, + tracing_tags: Vec, } impl TracingTelemetry { @@ -177,6 +81,9 @@ impl TracingTelemetry { writer: FmtLayer::Stdout, style: FmtStyle::Pretty, colours: true, + jaeger: None, + events_url: None, + tracing_tags: Vec::new(), } } /// Specify writer stream. @@ -191,52 +98,121 @@ impl TracingTelemetry { pub fn with_colours(self, colours: bool) -> TracingTelemetry { TracingTelemetry { colours, ..self } } + + /// Specify the jaeger endpoint, If any. + pub fn with_jaeger(self, jaeger: Option) -> TracingTelemetry { + TracingTelemetry { jaeger, ..self } + } + + /// Specify the events url, If any. + pub fn with_events_url(self, events_url: Option) -> TracingTelemetry { + TracingTelemetry { events_url, ..self } + } + + /// Specify the tracing tags, If any. + pub fn with_tracing_tags(self, tracing_tags: Vec) -> TracingTelemetry { + TracingTelemetry { + tracing_tags, + ..self + } + } + /// Initialize the telemetry instance. - pub fn init(self) { + pub fn init(self, service_name: &str) { + let stdout = tracing_subscriber::fmt::layer().with_writer(std::io::stdout); + let stderr = tracing_subscriber::fmt::layer() + .with_writer(std::io::stderr) + .with_ansi(self.colours); + let tracer: Option = self.jaeger.map(|jaeger| { + let tracing_tags = + self.tracing_tags + .into_iter() + .fold(Vec::::new(), |mut acc, kv| { + if !acc.iter().any(|acc| acc.key == kv.key) { + acc.push(kv); + } + acc + }); + set_jaeger_env(); + global::set_text_map_propagator(TraceContextPropagator::new()); + opentelemetry_jaeger::new_agent_pipeline() + .with_endpoint(jaeger) + .with_service_name(service_name) + .with_trace_config( + opentelemetry::sdk::trace::Config::default() + .with_resource(Resource::new(tracing_tags)), + ) + .install_batch(opentelemetry::runtime::TokioCurrentThread) + .expect("Should be able to initialise the exporter") + }); + + // Get the optional eventing layer. + let events_layer = self.events_url.map(|url| { + let target = filter::Targets::new().with_target(EVENT_BUS, Level::INFO); + EventHandle::init(url.to_string(), service_name).with_filter(target) + }); + + let subscriber = Registry::default() + .with(tracing_filter::rust_log_filter()) + .with(events_layer); + match (self.writer, self.style) { (FmtLayer::Stderr, FmtStyle::Compact) => { - let stderr = tracing_subscriber::fmt::layer() - .with_writer(std::io::stderr) - .compact() - .with_ansi(self.colours); - let subscriber = Registry::default() - .with(tracing_filter::rust_log_filter()) - .with(stderr); - subscriber.init(); + if let Some(tracer) = tracer { + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + subscriber.with(stderr.compact()).with(telemetry).init(); + } else { + subscriber.with(stderr.compact()).init(); + } } (FmtLayer::Stdout, FmtStyle::Compact) => { - let stdout = tracing_subscriber::fmt::layer() - .with_writer(std::io::stdout) - .compact() - .with_ansi(self.colours); - let subscriber = Registry::default() - .with(tracing_filter::rust_log_filter()) - .with(stdout); - subscriber.init(); + if let Some(tracer) = tracer { + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + subscriber.with(stdout.compact()).with(telemetry).init(); + } else { + subscriber.with(stdout.compact()).init(); + } } (FmtLayer::Stderr, FmtStyle::Pretty) => { - let stderr = tracing_subscriber::fmt::layer() - .with_writer(std::io::stderr) - .pretty() - .with_ansi(self.colours); - let subscriber = Registry::default() - .with(tracing_filter::rust_log_filter()) - .with(stderr); - subscriber.init(); + if let Some(tracer) = tracer { + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + subscriber.with(stderr.pretty()).with(telemetry).init(); + } else { + subscriber.with(stderr.pretty()).init(); + } } (FmtLayer::Stdout, FmtStyle::Pretty) => { - let stdout = tracing_subscriber::fmt::layer() - .with_writer(std::io::stdout) - .pretty() - .with_ansi(self.colours); - let subscriber = Registry::default() - .with(tracing_filter::rust_log_filter()) - .with(stdout); - subscriber.init(); + if let Some(tracer) = tracer { + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + subscriber.with(stdout.pretty()).with(telemetry).init(); + } else { + subscriber.with(stdout.pretty()).init(); + } + } + (FmtLayer::Stdout, FmtStyle::Json) => { + if let Some(tracer) = tracer { + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + subscriber.with(stdout.json()).with(telemetry).init(); + } else { + subscriber.with(stdout.json()).init(); + } + } + (FmtLayer::Stderr, FmtStyle::Json) => { + if let Some(tracer) = tracer { + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + subscriber.with(stderr.json()).with(telemetry).init(); + } else { + subscriber.with(stderr.json()).init(); + } } (FmtLayer::None, _) => { let subscriber = Registry::default().with(tracing_filter::rust_log_filter()); - subscriber.init() + if let Some(tracer) = tracer { + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + subscriber.with(telemetry).init(); + } else { + subscriber.init() + } } }; }