Skip to content

Commit

Permalink
Move RuntimeChannel type arg T to associated types (#1314)
Browse files Browse the repository at this point in the history
RuntimeChannel::batch_message_channel needs to be generic over the
message type. The type used to be declared on the RuntimeChannel<T>
trait. This means a RuntimeChannel can only be used with one particular
message type, which feels unfortunate.

    fn install<R: RuntimeChannel<??::BatchMessage>>(runtime: R) {
        // Can't use the same runtime here. :-(
	TracerProvider::builder().with_batch_exporter(e, runtime);
	LoggerProvider::builder().with_batch_exporter(e, runtime);
    }

This change moves the type argument to the batch_message_channel<T>
function and the associated types Receiver<T> and Sender<T>. Channels
are still specific to a message type, but a RuntimeChannel can be used
with any number of message types.

    fn install<R: RuntimeChannel>(runtime: R) {
        // It works. :-)
	TracerProvider::builder().with_batch_exporter(e, runtime);
	LoggerProvider::builder().with_batch_exporter(e, runtime);
    }

This also means the BatchMessage types no longer need to be public.
  • Loading branch information
frigus02 authored Oct 27, 2023
1 parent ad18037 commit 2022ace
Show file tree
Hide file tree
Showing 17 changed files with 77 additions and 76 deletions.
4 changes: 2 additions & 2 deletions opentelemetry-contrib/src/trace/exporter/jaeger_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use opentelemetry::trace::{SpanId, TraceError};
use opentelemetry_sdk::{
export::trace::{ExportResult, SpanData, SpanExporter},
runtime::RuntimeChannel,
trace::{BatchMessage, Tracer, TracerProvider},
trace::{Tracer, TracerProvider},
};
use opentelemetry_semantic_conventions::SCHEMA_URL;
use std::collections::HashMap;
Expand Down Expand Up @@ -213,7 +213,7 @@ fn opentelemetry_value_to_json(value: &opentelemetry::Value) -> (&str, serde_jso
///
/// [`RuntimeChannel`]: opentelemetry_sdk::runtime::RuntimeChannel
#[async_trait]
pub trait JaegerJsonRuntime: RuntimeChannel<BatchMessage> + std::fmt::Debug {
pub trait JaegerJsonRuntime: RuntimeChannel + std::fmt::Debug {
/// Create a new directory if the given path does not exist yet
async fn create_dir(&self, path: &Path) -> ExportResult;
/// Write the provided content to a new file at the given path
Expand Down
7 changes: 2 additions & 5 deletions opentelemetry-datadog/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use opentelemetry_sdk::{
export::trace::{ExportResult, SpanData, SpanExporter},
resource::{ResourceDetector, SdkProvidedResourceDetector},
runtime::RuntimeChannel,
trace::{BatchMessage, Config, Tracer, TracerProvider},
trace::{Config, Tracer, TracerProvider},
Resource,
};
use opentelemetry_semantic_conventions as semcov;
Expand Down Expand Up @@ -300,10 +300,7 @@ impl DatadogPipelineBuilder {

/// Install the Datadog trace exporter pipeline using a batch span processor with the specified
/// runtime.
pub fn install_batch<R: RuntimeChannel<BatchMessage>>(
mut self,
runtime: R,
) -> Result<Tracer, TraceError> {
pub fn install_batch<R: RuntimeChannel>(mut self, runtime: R) -> Result<Tracer, TraceError> {
let (config, service_name) = self.build_config_and_service_name();
let exporter = self.build_exporter_with_service_name(service_name)?;
let mut provider_builder = TracerProvider::builder().with_batch_exporter(exporter, runtime);
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-jaeger/src/exporter/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
))]
use crate::exporter::addrs_and_family;
use async_trait::async_trait;
use opentelemetry_sdk::{runtime::RuntimeChannel, trace::BatchMessage};
use opentelemetry_sdk::runtime::RuntimeChannel;
use std::net::ToSocketAddrs;

/// Jaeger Trace Runtime is an extension to [`RuntimeChannel`].
///
/// [`RuntimeChannel`]: opentelemetry_sdk::runtime::RuntimeChannel
#[async_trait]
pub trait JaegerTraceRuntime: RuntimeChannel<BatchMessage> + std::fmt::Debug {
pub trait JaegerTraceRuntime: RuntimeChannel + std::fmt::Debug {
/// A communication socket between Jaeger client and agent.
type Socket: std::fmt::Debug + Send + Sync;

Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use opentelemetry::{
global,
logs::{LogError, LoggerProvider},
};
use opentelemetry_sdk::{self, export::logs::LogData, logs::BatchMessage, runtime::RuntimeChannel};
use opentelemetry_sdk::{self, export::logs::LogData, runtime::RuntimeChannel};

/// Compression algorithm to use, defaults to none.
pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION";
Expand Down Expand Up @@ -166,7 +166,7 @@ impl OtlpLogPipeline<LogExporterBuilder> {
/// Returns a [`Logger`] with the name `opentelemetry-otlp` and the current crate version.
///
/// [`Logger`]: opentelemetry_sdk::logs::Logger
pub fn install_batch<R: RuntimeChannel<BatchMessage>>(
pub fn install_batch<R: RuntimeChannel>(
self,
runtime: R,
) -> Result<opentelemetry_sdk::logs::Logger, LogError> {
Expand Down Expand Up @@ -198,7 +198,7 @@ fn build_simple_with_exporter(
logger
}

fn build_batch_with_exporter<R: RuntimeChannel<BatchMessage>>(
fn build_batch_with_exporter<R: RuntimeChannel>(
exporter: LogExporter,
log_config: Option<opentelemetry_sdk::logs::Config>,
runtime: R,
Expand Down
5 changes: 2 additions & 3 deletions opentelemetry-otlp/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use opentelemetry::{
use opentelemetry_sdk::{
self as sdk,
export::trace::{ExportResult, SpanData},
trace::BatchMessage,
};
use opentelemetry_semantic_conventions::SCHEMA_URL;
use sdk::runtime::RuntimeChannel;
Expand Down Expand Up @@ -122,7 +121,7 @@ impl OtlpTracePipeline<SpanExporterBuilder> {
/// `install_batch` will panic if not called within a tokio runtime
///
/// [`Tracer`]: opentelemetry::trace::Tracer
pub fn install_batch<R: RuntimeChannel<BatchMessage>>(
pub fn install_batch<R: RuntimeChannel>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
Expand Down Expand Up @@ -154,7 +153,7 @@ fn build_simple_with_exporter(
tracer
}

fn build_batch_with_exporter<R: RuntimeChannel<BatchMessage>>(
fn build_batch_with_exporter<R: RuntimeChannel>(
exporter: SpanExporter,
trace_config: Option<sdk::trace::Config>,
runtime: R,
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@

`should_sample` changes `attributes` from `OrderMap<Key, Value>` to
`Vec<KeyValue>`.
- **Breaking** Move type argument from `RuntimeChannel<T>` to associated types [#1314](https://github.com/open-telemetry/opentelemetry-rust/pull/1314)

### Removed

- Remove context from Metric force_flush [#1245](https://github.com/open-telemetry/opentelemetry-rust/pull/1245)
- Remove `logs::BatchMessage` and `trace::BatchMessage` types [#1314](https://github.com/open-telemetry/opentelemetry-rust/pull/1314)

### Fixed

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{BatchLogProcessor, BatchMessage, Config, LogProcessor, SimpleLogProcessor};
use super::{BatchLogProcessor, Config, LogProcessor, SimpleLogProcessor};
use crate::{
export::logs::{LogData, LogExporter},
runtime::RuntimeChannel,
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Builder {
}

/// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use.
pub fn with_batch_exporter<T: LogExporter + 'static, R: RuntimeChannel<BatchMessage>>(
pub fn with_batch_exporter<T: LogExporter + 'static, R: RuntimeChannel>(
self,
exporter: T,
runtime: R,
Expand Down
16 changes: 8 additions & 8 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,19 @@ impl LogProcessor for SimpleLogProcessor {

/// A [`LogProcessor`] that asynchronously buffers log records and reports
/// them at a preconfigured interval.
pub struct BatchLogProcessor<R: RuntimeChannel<BatchMessage>> {
message_sender: R::Sender,
pub struct BatchLogProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,
}

impl<R: RuntimeChannel<BatchMessage>> Debug for BatchLogProcessor<R> {
impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("BatchLogProcessor")
.field("message_sender", &self.message_sender)
.finish()
}
}

impl<R: RuntimeChannel<BatchMessage>> LogProcessor for BatchLogProcessor<R> {
impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
fn emit(&self, data: LogData) {
let result = self.message_sender.try_send(BatchMessage::ExportLog(data));

Expand Down Expand Up @@ -158,7 +158,7 @@ impl<R: RuntimeChannel<BatchMessage>> LogProcessor for BatchLogProcessor<R> {
}
}

impl<R: RuntimeChannel<BatchMessage>> BatchLogProcessor<R> {
impl<R: RuntimeChannel> BatchLogProcessor<R> {
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig, runtime: R) -> Self {
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);
Expand Down Expand Up @@ -262,7 +262,7 @@ async fn export_with_timeout<R, E>(
batch: Vec<LogData>,
) -> ExportResult
where
R: RuntimeChannel<BatchMessage>,
R: RuntimeChannel,
E: LogExporter + ?Sized,
{
if batch.is_empty() {
Expand Down Expand Up @@ -323,7 +323,7 @@ pub struct BatchLogProcessorBuilder<E, R> {
impl<E, R> BatchLogProcessorBuilder<E, R>
where
E: LogExporter + 'static,
R: RuntimeChannel<BatchMessage>,
R: RuntimeChannel,
{
/// Set max queue size for batches
pub fn with_max_queue_size(self, size: usize) -> Self {
Expand Down Expand Up @@ -372,7 +372,7 @@ where
/// Messages sent between application thread and batch log processor's work thread.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum BatchMessage {
enum BatchMessage {
/// Export logs, usually called when the log is emitted.
ExportLog(LogData),
/// Flush the current buffer to the backend, it can be triggered by
Expand Down
3 changes: 1 addition & 2 deletions opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ mod log_processor;
pub use config::{config, Config};
pub use log_emitter::{Builder, Logger, LoggerProvider};
pub use log_processor::{
BatchConfig, BatchLogProcessor, BatchLogProcessorBuilder, BatchMessage, LogProcessor,
SimpleLogProcessor,
BatchConfig, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor, SimpleLogProcessor,
};
52 changes: 32 additions & 20 deletions opentelemetry-sdk/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,22 @@ impl Runtime for AsyncStd {
}
}

/// `MessageRuntime` is an extension to [`Runtime`]. Currently, it provides a
/// `RuntimeChannel` is an extension to [`Runtime`]. Currently, it provides a
/// channel that is used by the [log] and [span] batch processors.
///
/// [log]: crate::logs::BatchLogProcessor
/// [span]: crate::trace::BatchSpanProcessor
pub trait RuntimeChannel<T: Debug + Send>: Runtime {
pub trait RuntimeChannel: Runtime {
/// A future stream to receive batch messages from channels.
type Receiver: Stream<Item = T> + Send;
type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
/// A batch messages sender that can be sent across threads safely.
type Sender: TrySend<Message = T> + Debug;
type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;

/// Return the sender and receiver used to send batch messages.
fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver);
fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>);
}

/// Error returned by a [`TrySend`] implementation.
Expand Down Expand Up @@ -187,11 +190,14 @@ impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {

#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
impl<T: Debug + Send> RuntimeChannel<T> for Tokio {
type Receiver = tokio_stream::wrappers::ReceiverStream<T>;
type Sender = tokio::sync::mpsc::Sender<T>;

fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
impl RuntimeChannel for Tokio {
type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;

fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
sender,
Expand All @@ -202,11 +208,14 @@ impl<T: Debug + Send> RuntimeChannel<T> for Tokio {

#[cfg(feature = "rt-tokio-current-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
impl<T: Debug + Send> RuntimeChannel<T> for TokioCurrentThread {
type Receiver = tokio_stream::wrappers::ReceiverStream<T>;
type Sender = tokio::sync::mpsc::Sender<T>;

fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
impl RuntimeChannel for TokioCurrentThread {
type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;

fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
sender,
Expand All @@ -229,11 +238,14 @@ impl<T: Send> TrySend for async_std::channel::Sender<T> {

#[cfg(feature = "rt-async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
impl<T: Debug + Send> RuntimeChannel<T> for AsyncStd {
type Receiver = async_std::channel::Receiver<T>;
type Sender = async_std::channel::Sender<T>;

fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
impl RuntimeChannel for AsyncStd {
type Receiver<T: Debug + Send> = async_std::channel::Receiver<T>;
type Sender<T: Debug + Send> = async_std::channel::Sender<T>;

fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
async_std::channel::bounded(capacity)
}
}
3 changes: 1 addition & 2 deletions opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ pub use sampler::{Sampler, ShouldSample};
pub use span::Span;
pub use span_limit::SpanLimits;
pub use span_processor::{
BatchConfig, BatchMessage, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor,
SpanProcessor,
BatchConfig, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor, SpanProcessor,
};
pub use tracer::Tracer;

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! not duplicate this data to avoid that different [`Tracer`] instances
//! of the [`TracerProvider`] have different versions of these data.
use crate::runtime::RuntimeChannel;
use crate::trace::{BatchMessage, BatchSpanProcessor, SimpleSpanProcessor, Tracer};
use crate::trace::{BatchSpanProcessor, SimpleSpanProcessor, Tracer};
use crate::{export::trace::SpanExporter, trace::SpanProcessor};
use crate::{InstrumentationLibrary, Resource};
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -166,7 +166,7 @@ impl Builder {
}

/// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use.
pub fn with_batch_exporter<T: SpanExporter + 'static, R: RuntimeChannel<BatchMessage>>(
pub fn with_batch_exporter<T: SpanExporter + 'static, R: RuntimeChannel>(
self,
exporter: T,
runtime: R,
Expand Down
8 changes: 2 additions & 6 deletions opentelemetry-sdk/src/trace/runtime_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use crate::export::trace::{ExportResult, SpanExporter};
use crate::runtime;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use crate::runtime::RuntimeChannel;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use crate::trace::BatchMessage;
use futures_util::future::BoxFuture;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use opentelemetry::global::*;
Expand Down Expand Up @@ -42,7 +40,7 @@ impl SpanCountExporter {
}

#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
fn build_batch_tracer_provider<R: RuntimeChannel<BatchMessage>>(
fn build_batch_tracer_provider<R: RuntimeChannel>(
exporter: SpanCountExporter,
runtime: R,
) -> crate::trace::TracerProvider {
Expand All @@ -61,9 +59,7 @@ fn build_simple_tracer_provider(exporter: SpanCountExporter) -> crate::trace::Tr
}

#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
async fn test_set_provider_in_tokio<R: RuntimeChannel<BatchMessage>>(
runtime: R,
) -> Arc<AtomicUsize> {
async fn test_set_provider_in_tokio<R: RuntimeChannel>(runtime: R) -> Arc<AtomicUsize> {
let exporter = SpanCountExporter::new();
let span_count = exporter.span_count.clone();
let _ = set_tracer_provider(build_batch_tracer_provider(exporter, runtime));
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/trace/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl Sampler {
where
C: HttpClient + 'static,
Sampler: ShouldSample,
R: crate::runtime::RuntimeChannel<crate::trace::BatchMessage>,
R: crate::runtime::RuntimeChannel,
Svc: Into<String>,
{
JaegerRemoteSamplerBuilder::new(runtime, http_client, default_sampler, service_name)
Expand Down
Loading

0 comments on commit 2022ace

Please sign in to comment.