Skip to content

Commit

Permalink
refactor(metrics): remove Prometheus exporter and registry references
Browse files Browse the repository at this point in the history
Eliminate the Prometheus exporter-related code to streamline the metric reader with OTLP configuration only.

refactor(metrics): generalize metric reader type in HttpMetricsLayerBuilder
  • Loading branch information
ttys3 committed Dec 5, 2024
1 parent 0508e73 commit 79e8f88
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 105 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ opentelemetry-http = "0.27.0"
tokio = { version = "1.42", features = ["macros"] }

[patch.crates-io]
opentelemetry-prometheus = { git="https://github.com/ttys3/opentelemetry-rust.git", branch="opentelemetry-prometheus-sdk-0.26" }
opentelemetry-prometheus = { git="https://github.com/ttys3/opentelemetry-rust.git", branch="opentelemetry-prometheus-sdk-0.27" }
177 changes: 73 additions & 104 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use opentelemetry::metrics::{Counter, Histogram, UpDownCounter};
use opentelemetry::metrics::MeterProvider;

use opentelemetry_sdk::metrics::{
reader::{DefaultTemporalitySelector},
Temporality,
PeriodicReader, SdkMeterProvider,
};
use opentelemetry_sdk::resource::{EnvResourceDetector, SdkProvidedResourceDetector, TelemetryResourceDetector};
Expand Down Expand Up @@ -104,9 +104,6 @@ pub struct Metric {

#[derive(Clone)]
pub struct MetricState {
/// Prometheus Registry we used to gathering and exporting metrics in the export endpoint
registry: Option<Registry>,

/// hold the metrics we used in the middleware
pub metric: Metric,

Expand Down Expand Up @@ -161,29 +158,6 @@ const HTTP_REQ_SIZE_HISTOGRAM_BUCKETS: &[f64] = &[
10.0 * MB, // 10 MB
];

impl HttpMetricsLayer {
pub fn routes<S>(&self) -> Router<S> {
Router::new()
.route(self.path.as_str(), get(Self::exporter_handler))
.with_state(self.state.clone())
}

// TODO use a static global exporter like autometrics-rs?
// https://github.com/autometrics-dev/autometrics-rs/blob/d3e7bffeede43f6c77b6a992b0443c0fca34003f/autometrics/src/prometheus_exporter.rs#L10
pub async fn exporter_handler(state: State<MetricState>) -> impl IntoResponse {
// tracing::trace!("exporter_handler called");
match state.registry {
Some(ref registry) => {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
encoder.encode(&registry.gather(), &mut buffer).unwrap();
// return metrics
String::from_utf8(buffer).unwrap()
}
None => "#no prometheus registry".to_string(),
}
}
}

/// A helper that instructs the metrics layer to ignore
/// certain paths.
Expand Down Expand Up @@ -235,18 +209,18 @@ impl Default for PathSkipper {
}

#[derive(Clone)]
pub struct HttpMetricsLayerBuilder {
pub struct HttpMetricsLayerBuilder<T: opentelemetry_sdk::metrics::reader::MetricReader + Send + Sync + 'static> {
service_name: Option<String>,
service_version: Option<String>,
prefix: Option<String>,
path: String,
labels: Option<HashMap<String, String>>,
skipper: PathSkipper,
is_tls: bool,
exporter: Option<String>,
metric_reader: Option<T>,
}

impl Default for HttpMetricsLayerBuilder {
impl<T: opentelemetry_sdk::metrics::reader::MetricReader + Send + Sync + 'static> Default for HttpMetricsLayerBuilder<T> {
fn default() -> Self {
Self {
service_name: None,
Expand All @@ -256,12 +230,12 @@ impl Default for HttpMetricsLayerBuilder {
labels: None,
skipper: PathSkipper::default(),
is_tls: false,
exporter: Some("prometheus".to_string()),
metric_reader: None,
}
}
}

impl HttpMetricsLayerBuilder {
impl<T: opentelemetry_sdk::metrics::reader::MetricReader + Send + Sync + 'static> HttpMetricsLayerBuilder<T> {
pub fn new() -> Self {
HttpMetricsLayerBuilder::default()
}
Expand Down Expand Up @@ -296,8 +270,8 @@ impl HttpMetricsLayerBuilder {
self
}

pub fn with_exporter(mut self, exporter: String) -> Self {
self.exporter = Some(exporter);
pub fn with_metric_reader(mut self, metric_reader: T) -> Self {
self.metric_reader = Some(metric_reader);
self
}

Expand Down Expand Up @@ -344,18 +318,15 @@ impl HttpMetricsLayerBuilder {
res
};

let mut registry = None;
let mut builder = SdkMeterProvider::builder().with_resource(res);

// exporter

if self.exporter == Some("otlp".to_string()) {
builder = builder.with_reader(self.build_otlp());
if let Some(metric_reader) = self.metric_reader {
builder = builder.with_reader(metric_reader);
} else {
let (reg, exporter) = self.build_prometheus();
registry = Some(reg);
builder = builder.with_reader(exporter);
builder = builder.with_reader(self.build_otlp());
}


let provider = builder.build();

Expand All @@ -364,50 +335,49 @@ impl HttpMetricsLayerBuilder {
// this must called after the global meter provider has ben initialized
// let meter = global::meter("axum-app");
// let meter = provider.meter("axum-app");
let meter = provider.versioned_meter(
env!("CARGO_PKG_NAME"),
Some(env!("CARGO_PKG_VERSION")),
Some("https://opentelemetry.io/schema/1.0.0"),
None,
let meter = provider.meter_with_scope(
opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
.with_version(env!("CARGO_PKG_VERSION"))
.with_schema_url("https://opentelemetry.io/schema/1.0.0")
.build(),
);

// requests_total
let requests_total = meter
.u64_counter("requests")
.with_description("How many HTTP requests processed, partitioned by status code and HTTP method.")
.init();
.build();

// request_duration_seconds
let req_duration = meter
.f64_histogram("http.server.request.duration")
.with_unit("s")
.with_description("The HTTP request latencies in seconds.")
.with_boundaries(HTTP_REQ_DURATION_HISTOGRAM_BUCKETS.to_vec())
.init();
.build();

// request_size_bytes
let req_size = meter
.u64_histogram("http.server.request.size")
.with_unit("By")
.with_description("The HTTP request sizes in bytes.")
.with_boundaries(HTTP_REQ_SIZE_HISTOGRAM_BUCKETS.to_vec())
.init();
.build();

let res_size = meter
.u64_histogram("http.server.response.size")
.with_unit("By")
.with_description("The HTTP reponse sizes in bytes.")
.with_boundaries(HTTP_REQ_SIZE_HISTOGRAM_BUCKETS.to_vec())
.init();
.build();

// no u64_up_down_counter because up_down_counter maybe < 0 since it allow negative values
let req_active = meter
.i64_up_down_counter("http.server.active_requests")
.with_description("The number of active HTTP requests.")
.init();
.build();

let meter_state = MetricState {
registry,
metric: Metric {
requests_total,
req_duration,
Expand All @@ -425,25 +395,11 @@ impl HttpMetricsLayerBuilder {
}
}

fn build_prometheus(&self) -> (Registry, impl opentelemetry_sdk::metrics::reader::MetricReader) {
let registry = if let Some(prefix) = self.prefix.clone() {
Registry::new_custom(Some(prefix), self.labels.clone()).expect("create prometheus registry")
} else {
Registry::new()
};
// init prometheus exporter
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()
.unwrap();
(registry, exporter)
}

/// init otlp metrics exporter
/// read from env var:
/// OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_HEADERS,OTEL_EXPORTER_OTLP_METRICS_TIMEOUT
/// ref https://github.com/tokio-rs/tracing-opentelemetry/blob/5e3354ec24debcfbf856bfd1eb7022459dca1e6a/examples/opentelemetry-otlp.rs#L32
fn build_otlp(&self) -> impl opentelemetry_sdk::metrics::reader::MetricReader {
fn build_otlp(&self) -> PeriodicReader {
let protocol = match env::var("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL")
.ok()
.or(env::var("OTEL_EXPORTER_OTLP_PROTOCOL").ok())
Expand All @@ -453,25 +409,22 @@ impl HttpMetricsLayerBuilder {
};

let exporter = if protocol.starts_with("http") {
opentelemetry_otlp::new_exporter()
.http()
.build_metrics_exporter(
Box::new(DefaultTemporalitySelector::new()),
)
opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_temporality(Temporality::default())
.build()
.unwrap()
} else {
opentelemetry_otlp::new_exporter()
.tonic()
.build_metrics_exporter(
Box::new(DefaultTemporalitySelector::new()),
)
opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_temporality(Temporality::default())
.build()
.unwrap()
};

let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio)
PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio)
.with_interval(std::time::Duration::from_secs(30))
.build();
reader
.build()
}
}

Expand Down Expand Up @@ -627,10 +580,7 @@ where
let res_size = response.body().size_hint().upper().unwrap_or(0);

let labels = [
KeyValue {
key: Key::from("http.request.method"),
value: Value::from(this.method.clone()),
},
KeyValue::new("http.request.method", this.method.clone()),
KeyValue::new("http.route", this.path.clone()),
KeyValue::new("http.response.status_code", status),
// server.address: Name of the local HTTP server that received the request.
Expand All @@ -657,6 +607,7 @@ where
#[cfg(test)]
mod tests {
use crate::HttpMetricsLayerBuilder;
use crate::HttpMetricsLayer;
use axum::extract::State;
use axum::routing::get;
use axum::Router;
Expand All @@ -665,8 +616,8 @@ mod tests {
use prometheus::{Encoder, Registry, TextEncoder};
use std::sync::Arc;

#[test]
fn test_prometheus_exporter() {
#[tokio::test]
async fn test_prometheus_exporter() {
let _cx = Context::current();

let registry = Registry::new();
Expand All @@ -677,16 +628,18 @@ mod tests {
.build()
.unwrap();

let provider = SdkMeterProvider::builder().with_reader(exporter).build();
let provider = SdkMeterProvider::builder()
.with_reader(exporter)
.build();

// init the global meter provider
global::set_meter_provider(provider.clone());

let meter = global::meter("my-app");

// Use two instruments
let counter = meter.u64_counter("a.counter").with_description("Counts things").init();
let recorder = meter.u64_histogram("a.histogram").with_description("Records values").init();
let counter = meter.u64_counter("a.counter").with_description("Counts things").build();
let recorder = meter.u64_histogram("a.histogram").with_description("Records values").build();

counter.add(100, &[KeyValue::new("key", "value")]);
recorder.record(100, &[KeyValue::new("key", "value")]);
Expand All @@ -699,12 +652,18 @@ mod tests {
println!("{}", String::from_utf8(result).unwrap());
}

#[test]
fn test_builder() {
let metrics = HttpMetricsLayerBuilder::new().build();
let _app = Router::new()
#[tokio::test]
async fn test_prom_exporter_builder() {
let metrics = HttpMetricsLayerBuilder::<opentelemetry_prometheus::PrometheusExporter>::new().build();
let _app = Router::<HttpMetricsLayer>::new()
// export metrics at `/metrics` endpoint
.merge(metrics.routes::<()>())
.route("/metrics", get(|| async {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
// return metrics
String::from_utf8(buffer).unwrap()
}))
.route("/", get(handler))
.route("/hello", get(handler))
.route("/world", get(handler))
Expand All @@ -716,15 +675,20 @@ mod tests {
}
}

#[test]
fn test_builder_with_state_router() {
#[tokio::test]
async fn test_builder_with_state_router() {
#[derive(Clone)]
struct AppState {}

let metrics = HttpMetricsLayerBuilder::new().build();
let metrics = HttpMetricsLayerBuilder::<opentelemetry_prometheus::PrometheusExporter>::new().build();
let _app: Router<AppState> = Router::new()
// export metrics at `/metrics` endpoint
.merge(metrics.routes::<AppState>())
.route("/metrics", get(|| async {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
// return metrics
String::from_utf8(buffer).unwrap()
}))
.route("/", get(handler))
.route("/hello", get(handler))
.route("/world", get(handler))
Expand All @@ -737,17 +701,22 @@ mod tests {
}
}

#[test]
fn test_builder_with_arced_skipper() {
#[tokio::test]
async fn test_builder_with_arced_skipper() {
#[derive(Clone)]
struct AppState {}

let metrics = HttpMetricsLayerBuilder::new()
let metrics = HttpMetricsLayerBuilder::<opentelemetry_prometheus::PrometheusExporter>::new()
.with_skipper(crate::PathSkipper::new_with_fn(Arc::new(|_: &str| true)))
.build();
let _app: Router<AppState> = Router::new()
// export metrics at `/metrics` endpoint
.merge(metrics.routes::<AppState>())
.route("/metrics", get(|| async {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
// return metrics
String::from_utf8(buffer).unwrap()
}))
.route("/", get(handler))
// add the metrics middleware
.layer(metrics)
Expand Down

0 comments on commit 79e8f88

Please sign in to comment.