From 2022ace7b42af3e8e474943d62a5bf368b8d4e7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20K=C3=BChle?= Date: Fri, 27 Oct 2023 07:11:30 +0200 Subject: [PATCH] Move RuntimeChannel type arg T to associated types (#1314) RuntimeChannel::batch_message_channel needs to be generic over the message type. The type used to be declared on the RuntimeChannel trait. This means a RuntimeChannel can only be used with one particular message type, which feels unfortunate. fn install>(runtime: R) { // Can't use the same runtime here. :-( TracerProvider::builder().with_batch_exporter(e, runtime); LoggerProvider::builder().with_batch_exporter(e, runtime); } This change moves the type argument to the batch_message_channel function and the associated types Receiver and Sender. Channels are still specific to a message type, but a RuntimeChannel can be used with any number of message types. fn install(runtime: R) { // It works. :-) TracerProvider::builder().with_batch_exporter(e, runtime); LoggerProvider::builder().with_batch_exporter(e, runtime); } This also means the BatchMessage types no longer need to be public. --- .../src/trace/exporter/jaeger_json.rs | 4 +- opentelemetry-datadog/src/exporter/mod.rs | 7 +-- opentelemetry-jaeger/src/exporter/runtime.rs | 4 +- opentelemetry-otlp/src/logs.rs | 6 +-- opentelemetry-otlp/src/span.rs | 5 +- opentelemetry-sdk/CHANGELOG.md | 2 + opentelemetry-sdk/src/logs/log_emitter.rs | 4 +- opentelemetry-sdk/src/logs/log_processor.rs | 16 +++--- opentelemetry-sdk/src/logs/mod.rs | 3 +- opentelemetry-sdk/src/runtime.rs | 52 ++++++++++++------- opentelemetry-sdk/src/trace/mod.rs | 3 +- opentelemetry-sdk/src/trace/provider.rs | 4 +- opentelemetry-sdk/src/trace/runtime_tests.rs | 8 +-- opentelemetry-sdk/src/trace/sampler.rs | 2 +- .../trace/sampler/jaeger_remote/sampler.rs | 10 ++-- opentelemetry-sdk/src/trace/span_processor.rs | 16 +++--- opentelemetry-zipkin/src/exporter/mod.rs | 7 +-- 17 files changed, 77 insertions(+), 76 deletions(-) diff --git a/opentelemetry-contrib/src/trace/exporter/jaeger_json.rs b/opentelemetry-contrib/src/trace/exporter/jaeger_json.rs index 085e51b121..38b25aa0a3 100644 --- a/opentelemetry-contrib/src/trace/exporter/jaeger_json.rs +++ b/opentelemetry-contrib/src/trace/exporter/jaeger_json.rs @@ -8,7 +8,7 @@ use opentelemetry::trace::{SpanId, TraceError}; use opentelemetry_sdk::{ export::trace::{ExportResult, SpanData, SpanExporter}, runtime::RuntimeChannel, - trace::{BatchMessage, Tracer, TracerProvider}, + trace::{Tracer, TracerProvider}, }; use opentelemetry_semantic_conventions::SCHEMA_URL; use std::collections::HashMap; @@ -213,7 +213,7 @@ fn opentelemetry_value_to_json(value: &opentelemetry::Value) -> (&str, serde_jso /// /// [`RuntimeChannel`]: opentelemetry_sdk::runtime::RuntimeChannel #[async_trait] -pub trait JaegerJsonRuntime: RuntimeChannel + std::fmt::Debug { +pub trait JaegerJsonRuntime: RuntimeChannel + std::fmt::Debug { /// Create a new directory if the given path does not exist yet async fn create_dir(&self, path: &Path) -> ExportResult; /// Write the provided content to a new file at the given path diff --git a/opentelemetry-datadog/src/exporter/mod.rs b/opentelemetry-datadog/src/exporter/mod.rs index 1ae28e6cd6..9b07183390 100644 --- a/opentelemetry-datadog/src/exporter/mod.rs +++ b/opentelemetry-datadog/src/exporter/mod.rs @@ -15,7 +15,7 @@ use opentelemetry_sdk::{ export::trace::{ExportResult, SpanData, SpanExporter}, resource::{ResourceDetector, SdkProvidedResourceDetector}, runtime::RuntimeChannel, - trace::{BatchMessage, Config, Tracer, TracerProvider}, + trace::{Config, Tracer, TracerProvider}, Resource, }; use opentelemetry_semantic_conventions as semcov; @@ -300,10 +300,7 @@ impl DatadogPipelineBuilder { /// Install the Datadog trace exporter pipeline using a batch span processor with the specified /// runtime. - pub fn install_batch>( - mut self, - runtime: R, - ) -> Result { + pub fn install_batch(mut self, runtime: R) -> Result { let (config, service_name) = self.build_config_and_service_name(); let exporter = self.build_exporter_with_service_name(service_name)?; let mut provider_builder = TracerProvider::builder().with_batch_exporter(exporter, runtime); diff --git a/opentelemetry-jaeger/src/exporter/runtime.rs b/opentelemetry-jaeger/src/exporter/runtime.rs index 7b52dddd1c..5348eefbcf 100644 --- a/opentelemetry-jaeger/src/exporter/runtime.rs +++ b/opentelemetry-jaeger/src/exporter/runtime.rs @@ -5,14 +5,14 @@ ))] use crate::exporter::addrs_and_family; use async_trait::async_trait; -use opentelemetry_sdk::{runtime::RuntimeChannel, trace::BatchMessage}; +use opentelemetry_sdk::runtime::RuntimeChannel; use std::net::ToSocketAddrs; /// Jaeger Trace Runtime is an extension to [`RuntimeChannel`]. /// /// [`RuntimeChannel`]: opentelemetry_sdk::runtime::RuntimeChannel #[async_trait] -pub trait JaegerTraceRuntime: RuntimeChannel + std::fmt::Debug { +pub trait JaegerTraceRuntime: RuntimeChannel + std::fmt::Debug { /// A communication socket between Jaeger client and agent. type Socket: std::fmt::Debug + Send + Sync; diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 2372b21248..21f8fbb0ed 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -19,7 +19,7 @@ use opentelemetry::{ global, logs::{LogError, LoggerProvider}, }; -use opentelemetry_sdk::{self, export::logs::LogData, logs::BatchMessage, runtime::RuntimeChannel}; +use opentelemetry_sdk::{self, export::logs::LogData, runtime::RuntimeChannel}; /// Compression algorithm to use, defaults to none. pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION"; @@ -166,7 +166,7 @@ impl OtlpLogPipeline { /// Returns a [`Logger`] with the name `opentelemetry-otlp` and the current crate version. /// /// [`Logger`]: opentelemetry_sdk::logs::Logger - pub fn install_batch>( + pub fn install_batch( self, runtime: R, ) -> Result { @@ -198,7 +198,7 @@ fn build_simple_with_exporter( logger } -fn build_batch_with_exporter>( +fn build_batch_with_exporter( exporter: LogExporter, log_config: Option, runtime: R, diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index 475ee1fc5c..a8503c87a4 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -12,7 +12,6 @@ use opentelemetry::{ use opentelemetry_sdk::{ self as sdk, export::trace::{ExportResult, SpanData}, - trace::BatchMessage, }; use opentelemetry_semantic_conventions::SCHEMA_URL; use sdk::runtime::RuntimeChannel; @@ -122,7 +121,7 @@ impl OtlpTracePipeline { /// `install_batch` will panic if not called within a tokio runtime /// /// [`Tracer`]: opentelemetry::trace::Tracer - pub fn install_batch>( + pub fn install_batch( self, runtime: R, ) -> Result { @@ -154,7 +153,7 @@ fn build_simple_with_exporter( tracer } -fn build_batch_with_exporter>( +fn build_batch_with_exporter( exporter: SpanExporter, trace_config: Option, runtime: R, diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index aaa4d14ad6..fb5712ecd7 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -52,10 +52,12 @@ `should_sample` changes `attributes` from `OrderMap` to `Vec`. +- **Breaking** Move type argument from `RuntimeChannel` to associated types [#1314](https://github.com/open-telemetry/opentelemetry-rust/pull/1314) ### Removed - Remove context from Metric force_flush [#1245](https://github.com/open-telemetry/opentelemetry-rust/pull/1245) +- Remove `logs::BatchMessage` and `trace::BatchMessage` types [#1314](https://github.com/open-telemetry/opentelemetry-rust/pull/1314) ### Fixed diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index ee15ae47c2..557ed552c5 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,4 +1,4 @@ -use super::{BatchLogProcessor, BatchMessage, Config, LogProcessor, SimpleLogProcessor}; +use super::{BatchLogProcessor, Config, LogProcessor, SimpleLogProcessor}; use crate::{ export::logs::{LogData, LogExporter}, runtime::RuntimeChannel, @@ -140,7 +140,7 @@ impl Builder { } /// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use. - pub fn with_batch_exporter>( + pub fn with_batch_exporter( self, exporter: T, runtime: R, diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 6bac467a5b..342be3395f 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -109,11 +109,11 @@ impl LogProcessor for SimpleLogProcessor { /// A [`LogProcessor`] that asynchronously buffers log records and reports /// them at a preconfigured interval. -pub struct BatchLogProcessor> { - message_sender: R::Sender, +pub struct BatchLogProcessor { + message_sender: R::Sender, } -impl> Debug for BatchLogProcessor { +impl Debug for BatchLogProcessor { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("BatchLogProcessor") .field("message_sender", &self.message_sender) @@ -121,7 +121,7 @@ impl> Debug for BatchLogProcessor { } } -impl> LogProcessor for BatchLogProcessor { +impl LogProcessor for BatchLogProcessor { fn emit(&self, data: LogData) { let result = self.message_sender.try_send(BatchMessage::ExportLog(data)); @@ -158,7 +158,7 @@ impl> LogProcessor for BatchLogProcessor { } } -impl> BatchLogProcessor { +impl BatchLogProcessor { pub(crate) fn new(mut exporter: Box, config: BatchConfig, runtime: R) -> Self { let (message_sender, message_receiver) = runtime.batch_message_channel(config.max_queue_size); @@ -262,7 +262,7 @@ async fn export_with_timeout( batch: Vec, ) -> ExportResult where - R: RuntimeChannel, + R: RuntimeChannel, E: LogExporter + ?Sized, { if batch.is_empty() { @@ -323,7 +323,7 @@ pub struct BatchLogProcessorBuilder { impl BatchLogProcessorBuilder where E: LogExporter + 'static, - R: RuntimeChannel, + R: RuntimeChannel, { /// Set max queue size for batches pub fn with_max_queue_size(self, size: usize) -> Self { @@ -372,7 +372,7 @@ where /// Messages sent between application thread and batch log processor's work thread. #[allow(clippy::large_enum_variant)] #[derive(Debug)] -pub enum BatchMessage { +enum BatchMessage { /// Export logs, usually called when the log is emitted. ExportLog(LogData), /// Flush the current buffer to the backend, it can be triggered by diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index a2ed90a7c6..45d16d5467 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -7,6 +7,5 @@ mod log_processor; pub use config::{config, Config}; pub use log_emitter::{Builder, Logger, LoggerProvider}; pub use log_processor::{ - BatchConfig, BatchLogProcessor, BatchLogProcessorBuilder, BatchMessage, LogProcessor, - SimpleLogProcessor, + BatchConfig, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor, SimpleLogProcessor, }; diff --git a/opentelemetry-sdk/src/runtime.rs b/opentelemetry-sdk/src/runtime.rs index 565ebb8a48..7705c10e91 100644 --- a/opentelemetry-sdk/src/runtime.rs +++ b/opentelemetry-sdk/src/runtime.rs @@ -133,19 +133,22 @@ impl Runtime for AsyncStd { } } -/// `MessageRuntime` is an extension to [`Runtime`]. Currently, it provides a +/// `RuntimeChannel` is an extension to [`Runtime`]. Currently, it provides a /// channel that is used by the [log] and [span] batch processors. /// /// [log]: crate::logs::BatchLogProcessor /// [span]: crate::trace::BatchSpanProcessor -pub trait RuntimeChannel: Runtime { +pub trait RuntimeChannel: Runtime { /// A future stream to receive batch messages from channels. - type Receiver: Stream + Send; + type Receiver: Stream + Send; /// A batch messages sender that can be sent across threads safely. - type Sender: TrySend + Debug; + type Sender: TrySend + Debug; /// Return the sender and receiver used to send batch messages. - fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver); + fn batch_message_channel( + &self, + capacity: usize, + ) -> (Self::Sender, Self::Receiver); } /// Error returned by a [`TrySend`] implementation. @@ -187,11 +190,14 @@ impl TrySend for tokio::sync::mpsc::Sender { #[cfg(feature = "rt-tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] -impl RuntimeChannel for Tokio { - type Receiver = tokio_stream::wrappers::ReceiverStream; - type Sender = tokio::sync::mpsc::Sender; - - fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) { +impl RuntimeChannel for Tokio { + type Receiver = tokio_stream::wrappers::ReceiverStream; + type Sender = tokio::sync::mpsc::Sender; + + fn batch_message_channel( + &self, + capacity: usize, + ) -> (Self::Sender, Self::Receiver) { let (sender, receiver) = tokio::sync::mpsc::channel(capacity); ( sender, @@ -202,11 +208,14 @@ impl RuntimeChannel for Tokio { #[cfg(feature = "rt-tokio-current-thread")] #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))] -impl RuntimeChannel for TokioCurrentThread { - type Receiver = tokio_stream::wrappers::ReceiverStream; - type Sender = tokio::sync::mpsc::Sender; - - fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) { +impl RuntimeChannel for TokioCurrentThread { + type Receiver = tokio_stream::wrappers::ReceiverStream; + type Sender = tokio::sync::mpsc::Sender; + + fn batch_message_channel( + &self, + capacity: usize, + ) -> (Self::Sender, Self::Receiver) { let (sender, receiver) = tokio::sync::mpsc::channel(capacity); ( sender, @@ -229,11 +238,14 @@ impl TrySend for async_std::channel::Sender { #[cfg(feature = "rt-async-std")] #[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))] -impl RuntimeChannel for AsyncStd { - type Receiver = async_std::channel::Receiver; - type Sender = async_std::channel::Sender; - - fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) { +impl RuntimeChannel for AsyncStd { + type Receiver = async_std::channel::Receiver; + type Sender = async_std::channel::Sender; + + fn batch_message_channel( + &self, + capacity: usize, + ) -> (Self::Sender, Self::Receiver) { async_std::channel::bounded(capacity) } } diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index bb731f2132..3ef46c4925 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -26,8 +26,7 @@ pub use sampler::{Sampler, ShouldSample}; pub use span::Span; pub use span_limit::SpanLimits; pub use span_processor::{ - BatchConfig, BatchMessage, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor, - SpanProcessor, + BatchConfig, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor, SpanProcessor, }; pub use tracer::Tracer; diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 30c05c2003..93572cc8ba 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -9,7 +9,7 @@ //! not duplicate this data to avoid that different [`Tracer`] instances //! of the [`TracerProvider`] have different versions of these data. use crate::runtime::RuntimeChannel; -use crate::trace::{BatchMessage, BatchSpanProcessor, SimpleSpanProcessor, Tracer}; +use crate::trace::{BatchSpanProcessor, SimpleSpanProcessor, Tracer}; use crate::{export::trace::SpanExporter, trace::SpanProcessor}; use crate::{InstrumentationLibrary, Resource}; use once_cell::sync::OnceCell; @@ -166,7 +166,7 @@ impl Builder { } /// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use. - pub fn with_batch_exporter>( + pub fn with_batch_exporter( self, exporter: T, runtime: R, diff --git a/opentelemetry-sdk/src/trace/runtime_tests.rs b/opentelemetry-sdk/src/trace/runtime_tests.rs index e88ce596af..610d140b7a 100644 --- a/opentelemetry-sdk/src/trace/runtime_tests.rs +++ b/opentelemetry-sdk/src/trace/runtime_tests.rs @@ -6,8 +6,6 @@ use crate::export::trace::{ExportResult, SpanExporter}; use crate::runtime; #[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] use crate::runtime::RuntimeChannel; -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -use crate::trace::BatchMessage; use futures_util::future::BoxFuture; #[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] use opentelemetry::global::*; @@ -42,7 +40,7 @@ impl SpanCountExporter { } #[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -fn build_batch_tracer_provider>( +fn build_batch_tracer_provider( exporter: SpanCountExporter, runtime: R, ) -> crate::trace::TracerProvider { @@ -61,9 +59,7 @@ fn build_simple_tracer_provider(exporter: SpanCountExporter) -> crate::trace::Tr } #[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] -async fn test_set_provider_in_tokio>( - runtime: R, -) -> Arc { +async fn test_set_provider_in_tokio(runtime: R) -> Arc { let exporter = SpanCountExporter::new(); let span_count = exporter.span_count.clone(); let _ = set_tracer_provider(build_batch_tracer_provider(exporter, runtime)); diff --git a/opentelemetry-sdk/src/trace/sampler.rs b/opentelemetry-sdk/src/trace/sampler.rs index dc8bde293f..02ba4f3f8c 100644 --- a/opentelemetry-sdk/src/trace/sampler.rs +++ b/opentelemetry-sdk/src/trace/sampler.rs @@ -156,7 +156,7 @@ impl Sampler { where C: HttpClient + 'static, Sampler: ShouldSample, - R: crate::runtime::RuntimeChannel, + R: crate::runtime::RuntimeChannel, Svc: Into, { JaegerRemoteSamplerBuilder::new(runtime, http_client, default_sampler, service_name) diff --git a/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs b/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs index 07c05b94b1..6f942cbd7f 100644 --- a/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs +++ b/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs @@ -1,7 +1,7 @@ use crate::runtime::RuntimeChannel; use crate::trace::sampler::jaeger_remote::remote::SamplingStrategyResponse; use crate::trace::sampler::jaeger_remote::sampling_strategy::Inner; -use crate::trace::{BatchMessage, Sampler, ShouldSample}; +use crate::trace::{Sampler, ShouldSample}; use futures_util::{stream, StreamExt as _}; use http::Uri; use opentelemetry::trace::{Link, SamplingResult, SpanKind, TraceError, TraceId}; @@ -18,7 +18,7 @@ const DEFAULT_REMOTE_SAMPLER_ENDPOINT: &str = "http://localhost:5778/sampling"; #[derive(Debug)] pub struct JaegerRemoteSamplerBuilder where - R: RuntimeChannel, + R: RuntimeChannel, C: HttpClient + 'static, S: ShouldSample + 'static, { @@ -35,7 +35,7 @@ impl JaegerRemoteSamplerBuilder where C: HttpClient + 'static, S: ShouldSample + 'static, - R: RuntimeChannel, + R: RuntimeChannel, { pub(crate) fn new( runtime: R, @@ -155,7 +155,7 @@ impl JaegerRemoteSampler { leaky_bucket_size: f64, ) -> Self where - R: RuntimeChannel, + R: RuntimeChannel, C: HttpClient + 'static, S: ShouldSample + 'static, { @@ -185,7 +185,7 @@ impl JaegerRemoteSampler { shutdown: futures_channel::mpsc::Receiver<()>, endpoint: Uri, ) where - R: RuntimeChannel, + R: RuntimeChannel, C: HttpClient + 'static, { // todo: review if we need 'static here diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 9dd60941d6..73c92f6903 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -252,11 +252,11 @@ enum Message { /// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html /// [`tokio`]: https://tokio.rs /// [`async-std`]: https://async.rs -pub struct BatchSpanProcessor> { - message_sender: R::Sender, +pub struct BatchSpanProcessor { + message_sender: R::Sender, } -impl> fmt::Debug for BatchSpanProcessor { +impl fmt::Debug for BatchSpanProcessor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BatchSpanProcessor") .field("message_sender", &self.message_sender) @@ -264,7 +264,7 @@ impl> fmt::Debug for BatchSpanProcessor { } } -impl> SpanProcessor for BatchSpanProcessor { +impl SpanProcessor for BatchSpanProcessor { fn on_start(&self, _span: &mut Span, _cx: &Context) { // Ignored } @@ -310,7 +310,7 @@ impl> SpanProcessor for BatchSpanProcessor { // 2. Most of the messages will be ExportSpan. #[allow(clippy::large_enum_variant)] #[derive(Debug)] -pub enum BatchMessage { +enum BatchMessage { /// Export spans, usually called when span ends ExportSpan(SpanData), /// Flush the current buffer to the backend, it can be triggered by @@ -328,7 +328,7 @@ struct BatchSpanProcessorInternal { config: BatchConfig, } -impl> BatchSpanProcessorInternal { +impl BatchSpanProcessorInternal { async fn flush(&mut self, res_channel: Option>) { let export_task = self.export(); let task = Box::pin(async move { @@ -465,7 +465,7 @@ impl> BatchSpanProcessorInternal { } } -impl> BatchSpanProcessor { +impl BatchSpanProcessor { pub(crate) fn new(exporter: Box, config: BatchConfig, runtime: R) -> Self { let (message_sender, message_receiver) = runtime.batch_message_channel(config.max_queue_size); @@ -648,7 +648,7 @@ pub struct BatchSpanProcessorBuilder { impl BatchSpanProcessorBuilder where E: SpanExporter + 'static, - R: RuntimeChannel, + R: RuntimeChannel, { /// Set max queue size for batches pub fn with_max_queue_size(self, size: usize) -> Self { diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index 54eea89008..bc124593a5 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -12,7 +12,7 @@ use opentelemetry_sdk::{ export::{trace, ExportError}, resource::{ResourceDetector, SdkProvidedResourceDetector}, runtime::RuntimeChannel, - trace::{BatchMessage, Config, Tracer, TracerProvider}, + trace::{Config, Tracer, TracerProvider}, Resource, }; use opentelemetry_semantic_conventions as semcov; @@ -184,10 +184,7 @@ impl ZipkinPipelineBuilder { /// Install the Zipkin trace exporter pipeline with a batch span processor using the specified /// runtime. - pub fn install_batch>( - mut self, - runtime: R, - ) -> Result { + pub fn install_batch(mut self, runtime: R) -> Result { let (config, endpoint) = self.init_config_and_endpoint(); let exporter = self.init_exporter_with_endpoint(endpoint)?; let mut provider_builder = TracerProvider::builder().with_batch_exporter(exporter, runtime);