Skip to content

Commit

Permalink
Replace warp with hyper for http server impl (#184)
Browse files Browse the repository at this point in the history
In order to help trim down dependencies mostly. Warp is
a wrapper for hyper with fancy things like routes which we don't need
and it brings it a bit of libraries of its own.

Co-authored-by: Mark Mandel <markmandel@google.com>
  • Loading branch information
iffyio and markmandel authored Feb 4, 2021
1 parent 6a73500 commit 61324cc
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 19 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ backoff = "0.2.1"
bytes = "1.0.1"
clap = "2.33.0"
humantime-serde = "1.0.0"
hyper = "0.14.2"
parking_lot = "0.11.0"
prometheus = { version = "0.9", default-features = false }
prost = "0.7.0"
Expand All @@ -35,10 +36,9 @@ serde_json = "1.0.60"
slog = "2.5.2"
slog-async = "2.4.0"
slog-json = "2.3.0"
tokio = { version = "1.0.2", features = ["full", "test-util", "parking_lot"] }
tokio = { version = "1.1.0", features = ["rt-multi-thread", "signal", "test-util", "parking_lot"] }
slog-term = "2.5.0"
rand = "0.7.3"
warp = "0.3.0"
base64 = "0.12.3"
futures-intrusive = "0.4.0"
base64-serde = "0.5.1"
Expand Down
83 changes: 66 additions & 17 deletions src/proxy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
*/

use crate::proxy::sessions::metrics::Metrics as SessionMetrics;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Response, Server as HyperServer, StatusCode};
use prometheus::{Encoder, Registry, Result as MetricsResult, TextEncoder};
use slog::{info, warn, Logger};
use slog::{error, info, warn, Logger};
use std::convert::Infallible;
use std::net::SocketAddr;
use tokio::sync::watch::Receiver;
use warp::Filter as WarpFilter;

/// Metrics contains metrics configuration for the server.
#[derive(Clone)]
Expand All @@ -40,25 +42,72 @@ pub fn start_metrics_server(
) {
info!(log, "starting metrics endpoint at {}", addr.to_string());

let metrics_route = warp::path!("metrics").map(move || {
let mut buffer = vec![];
let encoder = TextEncoder::new();
encoder
.encode(&registry.gather(), &mut buffer)
.map_err(|err| warn!(log, "failed to encode metrics: {:?}", err))
.and_then(|_| {
String::from_utf8(buffer).map_err(|err| {
warn!(log, "failed to convert metrics to utf8: {:?}", err);
})
})
.unwrap_or_else(|_| "# failed to gather metrics".to_string())
let handler_log = log.clone();
let make_svc = make_service_fn(move |_conn| {
let registry = registry.clone();
let handler_log = handler_log.clone();
async move {
let registry = registry.clone();
let handler_log = handler_log.clone();
Ok::<_, Infallible>(service_fn(move |req| {
let registry = registry.clone();
let handler_log = handler_log.clone();
async move {
Ok::<_, Infallible>(handle_request(
handler_log,
req.method(),
req.uri().path(),
registry,
))
}
}))
}
});

let (_, server) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async move {
let _ = shutdown_rx.changed().await;
let server = HyperServer::bind(&addr)
.serve(make_svc)
.with_graceful_shutdown(async move {
shutdown_rx.changed().await.ok();
});

tokio::spawn(async move {
if let Err(err) = server.await {
error!(log, "metrics server exited with an error: {}", err);
}
});
}

fn handle_request(log: Logger, method: &Method, path: &str, registry: Registry) -> Response<Body> {
let mut response = Response::new(Body::empty());

match (method, path) {
(&Method::GET, "/metrics") => {
let mut buffer = vec![];
let encoder = TextEncoder::new();
let body = encoder
.encode(&registry.gather(), &mut buffer)
.map_err(|err| warn!(log, "failed to encode metrics: {:?}", err))
.and_then(|_| {
String::from_utf8(buffer)
.map(Body::from)
.map_err(|err| warn!(log, "failed to convert metrics to utf8: {:?}", err))
});

match body {
Ok(body) => {
*response.body_mut() = body;
}
Err(_) => {
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
}
}
}
_ => {
*response.status_mut() = StatusCode::NOT_FOUND;
}
};

tokio::spawn(server);
response
}

impl Default for Metrics {
Expand Down

0 comments on commit 61324cc

Please sign in to comment.