Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remote write for metric-explorer-prometheus #542

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion metrics-exporter-prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ categories = ["development-tools::debugging"]
keywords = ["metrics", "telemetry", "prometheus"]

[features]
default = ["http-listener", "push-gateway"]
default = ["http-listener", "push-gateway","remote-write"]
async-runtime = ["tokio", "hyper-util/tokio"]
http-listener = ["async-runtime", "ipnet", "tracing", "_hyper-server"]
uds-listener = ["http-listener"]
push-gateway = ["async-runtime", "tracing", "_hyper-client"]
remote-write = ["_hyper-client","async-runtime","dep:prost","dep:snap","dep:prometheus-parse"]
_hyper-server = ["http-body-util", "hyper/server", "hyper-util/server-auto"]
_hyper-client = [
"http-body-util",
Expand All @@ -48,7 +49,10 @@ metrics-util = { version = "^0.18", path = "../metrics-util", default-features =
"registry",
"summary",
] }
prometheus-parse = {version = "0.2.5", optional = true}
prost = {workspace = true, optional = true}
quanta = { workspace = true }
snap = { version = "1.1.1", optional = true}
thiserror = { workspace = true }
tokio = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
Expand All @@ -63,6 +67,10 @@ tracing-subscriber = { workspace = true, features = ["fmt"] }
name = "prometheus_push_gateway"
required-features = ["push-gateway"]

[[example]]
name = "prometheus_remote_write"
required-features = ["remote-write"]

[[example]]
name = "prometheus_server"
required-features = ["http-listener"]
Expand Down
73 changes: 73 additions & 0 deletions metrics-exporter-prometheus/examples/prometheus_remote_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/// Make sure to run this example with `--features remote-write` to properly enable remote write support.
#[allow(unused_imports)]
use std::thread;
use std::time::Duration;

#[allow(unused_imports)]
use metrics::{counter, gauge, histogram};
use metrics::{describe_counter, describe_histogram};
#[allow(unused_imports)]
use metrics_exporter_prometheus::PrometheusBuilder;
#[allow(unused_imports)]
use metrics_util::MetricKindMask;

use quanta::Clock;
use rand::{thread_rng, Rng};

fn main() {
tracing_subscriber::fmt::init();

PrometheusBuilder::new()
.with_remote_write(
"http://127.0.0.1:9091/metrics/job/example",
Duration::from_secs(10),
"test-agent",
)
.expect("remote write endpoint should be valid")
.idle_timeout(
MetricKindMask::COUNTER | MetricKindMask::HISTOGRAM,
Some(Duration::from_secs(10)),
)
.install()
.expect("failed to install Prometheus recorder");

// We register these metrics, which gives us a chance to specify a description for them. The
// Prometheus exporter records this description and adds it as HELP text when the endpoint is
// scraped.
//
// Registering metrics ahead of using them is not required, but is the only way to specify the
// description of a metric.
describe_counter!("tcp_server_loops", "The iterations of the TCP server event loop so far.");
describe_histogram!(
"tcp_server_loop_delta_secs",
"The time taken for iterations of the TCP server event loop."
);

let clock = Clock::new();
let mut last = None;

counter!("idle_metric").increment(1);
gauge!("testing").set(42.0);

// Loop over and over, pretending to do some work.
loop {
counter!("tcp_server_loops", "system" => "foo").increment(1);

if let Some(t) = last {
let delta: Duration = clock.now() - t;
histogram!("tcp_server_loop_delta_secs", "system" => "foo").record(delta);
}

let increment_gauge = thread_rng().gen_bool(0.75);
let gauge = gauge!("lucky_iterations");
if increment_gauge {
gauge.increment(1.0);
} else {
gauge.decrement(1.0);
}

last = Some(clock.now());

thread::sleep(Duration::from_millis(750));
}
}
4 changes: 3 additions & 1 deletion metrics-exporter-prometheus/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ pub enum BuildError {
/// The given push gateway endpoint is not a valid URI.
#[error("push gateway endpoint is not valid: {0}")]
InvalidPushGatewayEndpoint(String),

/// The given push gateway endpoint is not a valid URI.
#[error("remote write endpoint is not valid: {0}")]
InvalidRemoteWriteEndpoint(String),
/// No exporter configuration was present.
///
/// This generally only occurs when HTTP listener support is disabled, but no push gateway
Expand Down
35 changes: 35 additions & 0 deletions metrics-exporter-prometheus/src/exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,37 @@ impl PrometheusBuilder {
Ok(self)
}

/// Configures the exporter to push periodic requests to endpoint by [remote write protocol](https://prometheus.io/docs/specs/remote_write_spec/).
///
/// Running in remote write mode is mutually exclusive with the HTTP listener/push gateway i.e. enabling the remote write will
/// disable the HTTP listener/push gateway, and vise versa.
///
/// Defaults to disabled.
///
/// ## Errors
///
/// If the given endpoint cannot be parsed into a valid URI, an error variant will be returned describing the error.
///
#[cfg(feature = "remote-write")]
#[cfg_attr(docsrs, doc(cfg(feature = "remote-write")))]
pub fn with_remote_write<T>(
mut self,
endpoint: T,
interval: Duration,
user_agent: &str,
) -> Result<Self, BuildError>
where
T: AsRef<str>,
{
self.exporter_config = ExporterConfig::RemoteWrite {
endpoint: Uri::try_from(endpoint.as_ref())
.map_err(|e| BuildError::InvalidRemoteWriteEndpoint(e.to_string()))?,
interval,
user_agent: user_agent.to_string(),
};
Ok(self)
}

/// Configures the exporter to expose an HTTP listener that functions as a [scrape endpoint], listening on a Unix
/// Domain socket at the given path
///
Expand Down Expand Up @@ -486,6 +517,10 @@ impl PrometheusBuilder {
endpoint, interval, username, password, handle,
)
}
#[cfg(feature = "remote-write")]
ExporterConfig::RemoteWrite { endpoint, interval, user_agent } => {
super::remote_write::new_remote_write(endpoint, interval, handle, &user_agent)
}
},
))
}
Expand Down
12 changes: 12 additions & 0 deletions metrics-exporter-prometheus/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ enum ExporterConfig {
password: Option<String>,
},

// Run a remote write task sending to the given `endpoint` after `interval` time has elapsed,
// infinitely.
#[cfg(feature = "remote-write")]
RemoteWrite { endpoint: Uri, interval: Duration, user_agent: String },

#[allow(dead_code)]
Unconfigured,
}
Expand All @@ -60,6 +65,8 @@ impl ExporterConfig {
Self::HttpListener { .. } => "http-listener",
#[cfg(feature = "push-gateway")]
Self::PushGateway { .. } => "push-gateway",
#[cfg(feature = "remote-write")]
Self::RemoteWrite { .. } => "remote-write",
Self::Unconfigured => "unconfigured,",
}
}
Expand All @@ -71,4 +78,9 @@ mod http_listener;
#[cfg(feature = "push-gateway")]
mod push_gateway;

#[cfg(feature = "remote-write")]
mod remote_write;
#[cfg(feature = "remote-write")]
mod remote_write_proto;

pub(crate) mod builder;
75 changes: 75 additions & 0 deletions metrics-exporter-prometheus/src/exporter/remote_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::time::Duration;

use http_body_util::{BodyExt, Collected, Full};
use hyper::{body::Bytes, Uri};
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use tracing::error;

use crate::PrometheusHandle;

use super::{remote_write_proto::WriteRequest, ExporterFuture};

// Creates an ExporterFuture implementing a remote write.
pub(super) fn new_remote_write(
endpoint: Uri,
interval: Duration,
handle: PrometheusHandle,
user_agent: &str,
) -> ExporterFuture {
let user_agent = user_agent.to_string();
Box::pin(async move {
let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.expect("no native root CA certificates found")
.https_or_http()
.enable_http1()
.build();
let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new())
.pool_idle_timeout(Duration::from_secs(30))
.build(https);

loop {
// Sleep for `interval` amount of time, and then do a push.
tokio::time::sleep(interval).await;

let output = handle.render();
let binary = match WriteRequest::from_text_format(output) {
Ok(req) => req,
Err(err) => {
error!("failed to build output to remote write request: {}", err);
continue;
}
};

let req = match binary.build_http_request(&endpoint, &user_agent) {
Ok(req) => req,
Err(err) => {
error!("failed to build http remote write request {}", err);
continue;
}
};
match client.request(req).await {
Ok(response) => {
if !response.status().is_success() {
let status = response.status();
let status = status.canonical_reason().unwrap_or_else(|| status.as_str());
let body = response
.into_body()
.collect()
.await
.map(Collected::to_bytes)
.map_err(|_| ())
.and_then(|b| String::from_utf8(b[..].to_vec()).map_err(|_| ()))
.unwrap_or_else(|()| String::from("<failed to read response body>"));
error!(
message = "unexpected status after pushing metrics to remote write",
status,
%body,
);
}
}
Err(e) => error!("error sending request to remote write {}: {:?}", endpoint, e),
}
}
})
}
Loading