Skip to content

Commit

Permalink
Fix #342: Async support for push_metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Chalmers <adam.s.chalmers@gmail.com>
  • Loading branch information
adamchalmers committed Feb 21, 2021
1 parent 8e2d9e8 commit c45cdcd
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 16 deletions.
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ pub use self::histogram::{Histogram, HistogramOpts, HistogramTimer, HistogramVec
pub use self::metrics::Opts;
#[cfg(feature = "push")]
pub use self::push::{
hostname_grouping_key, push_add_collector, push_add_metrics, push_collector, push_metrics,
hostname_grouping_key, push_add_collector, push_add_collector_async, push_add_metrics,
push_add_metrics_async, push_collector, push_collector_async, push_metrics, push_metrics_async,
BasicAuthentication,
};
pub use self::registry::Registry;
Expand Down
140 changes: 125 additions & 15 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ use crate::registry::Registry;

const REQWEST_TIMEOUT_SEC: Duration = Duration::from_secs(10);

lazy_static! {
static ref HTTP_CLIENT: Client = Client::builder()
.timeout(REQWEST_TIMEOUT_SEC)
.build()
.unwrap();
}

/// `BasicAuthentication` holder for supporting `push` to Pushgateway endpoints
/// using Basic access authentication.
/// Can be passed to any `push_metrics` method.
Expand Down Expand Up @@ -60,6 +53,18 @@ pub fn push_metrics<S: BuildHasher>(
push(job, grouping, url, mfs, "PUT", basic_auth)
}

/// Functions just like `push_metrics`, except the metrics are pushed
/// asynchronously.
pub async fn push_metrics_async<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push_async(job, grouping, url, mfs, "PUT", basic_auth).await
}

/// `push_add_metrics` works like `push_metrics`, but only previously pushed
/// metrics with the same name (and the same job and other grouping labels) will
/// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
Expand All @@ -73,16 +78,25 @@ pub fn push_add_metrics<S: BuildHasher>(
push(job, grouping, url, mfs, "POST", basic_auth)
}

/// `push_add_metrics_async` works like `push_metrics`, but async.
pub async fn push_add_metrics_async<'a, S: BuildHasher>(
job: &'a str,
grouping: HashMap<String, String, S>,
url: &'a str,
mfs: Vec<proto::MetricFamily>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push(job, grouping, url, mfs, "POST", basic_auth)
}

const LABEL_NAME_JOB: &str = "job";

fn push<S: BuildHasher>(
fn configure_push<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
) -> Result<(String, impl Encoder, Vec<u8>)> {
// Suppress clippy warning needless_pass_by_value.
let grouping = grouping;

Expand Down Expand Up @@ -145,7 +159,24 @@ fn push<S: BuildHasher>(
// Ignore error, `no metrics` and `no name`.
let _ = encoder.encode(&[mf], &mut buf);
}
Ok((push_url, encoder, buf))
}

fn push<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs)?;
lazy_static! {
static ref HTTP_CLIENT: Client = Client::builder()
.timeout(REQWEST_TIMEOUT_SEC)
.build()
.unwrap();
}
let mut builder = HTTP_CLIENT
.request(
Method::from_str(method).unwrap(),
Expand All @@ -159,14 +190,50 @@ fn push<S: BuildHasher>(
}

let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;
handle_push_response(response.status(), push_url)
}

async fn push_async<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs)?;
lazy_static! {
static ref ASYNC_HTTP_CLIENT: reqwest::Client = reqwest::Client::builder()
.timeout(REQWEST_TIMEOUT_SEC)
.build()
.unwrap();
}
let mut builder = ASYNC_HTTP_CLIENT
.request(
Method::from_str(method).unwrap(),
Url::from_str(&push_url).unwrap(),
)
.header(CONTENT_TYPE, encoder.format_type())
.body(buf);

if let Some(BasicAuthentication { username, password }) = basic_auth {
builder = builder.basic_auth(username, Some(password));
}

match response.status() {
let response = builder
.send()
.await
.map_err(|e| Error::Msg(format!("{}", e)))?;
handle_push_response(response.status(), push_url)
}

fn handle_push_response(status: StatusCode, push_url: String) -> Result<()> {
match status {
StatusCode::ACCEPTED => Ok(()),
StatusCode::OK => Ok(()),
_ => Err(Error::Msg(format!(
"unexpected status code {} while pushing to {}",
response.status(),
push_url
status, push_url
))),
}
}
Expand All @@ -188,6 +255,23 @@ fn push_from_collector<S: BuildHasher>(
push(job, grouping, url, mfs, method, basic_auth)
}

async fn push_from_collector_async<'a, S: BuildHasher>(
job: &'a str,
grouping: HashMap<String, String, S>,
url: &'a str,
collectors: Vec<Box<dyn Collector>>,
method: &'a str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let registry = Registry::new();
for bc in collectors {
registry.register(bc)?;
}

let mfs = registry.gather();
push_async(job, grouping, url, mfs, method, basic_auth).await
}

/// `push_collector` push metrics collected from the provided collectors. It is
/// a convenient way to push only a few metrics.
pub fn push_collector<S: BuildHasher>(
Expand All @@ -200,7 +284,20 @@ pub fn push_collector<S: BuildHasher>(
push_from_collector(job, grouping, url, collectors, "PUT", basic_auth)
}

/// `push_add_collector` works like `push_add_metrics`, it collects from the
/// `push_collector_async` is just an async version of `push_collector`.
/// Pushes metrics collected from the provided collectors. It is
/// a convenient way to push only a few metrics.
pub async fn push_collector_async<'a, S: BuildHasher>(
job: &'a str,
grouping: HashMap<String, String, S>,
url: &'a str,
collectors: Vec<Box<dyn Collector>>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push_from_collector_async(job, grouping, url, collectors, "PUT", basic_auth).await
}

/// `push_add_collector` works like `push_add_collector`, it collects from the
/// provided collectors. It is a convenient way to push only a few metrics.
pub fn push_add_collector<S: BuildHasher>(
job: &str,
Expand All @@ -212,6 +309,19 @@ pub fn push_add_collector<S: BuildHasher>(
push_from_collector(job, grouping, url, collectors, "POST", basic_auth)
}

/// `push_add_collector_async` works like `push_add_collector`, but async.
/// It collects from the provided collectors. It is a convenient way to push
/// only a few metrics.
pub async fn push_add_collector_async<'a, S: BuildHasher>(
job: &'a str,
grouping: HashMap<String, String, S>,
url: &'a str,
collectors: Vec<Box<dyn Collector>>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push_from_collector_async(job, grouping, url, collectors, "POST", basic_auth).await
}

const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown");

/// `hostname_grouping_key` returns a label map with the only entry
Expand Down

0 comments on commit c45cdcd

Please sign in to comment.