From 61324cc1b70a36a0b7c8f8e4da7df172d391ec97 Mon Sep 17 00:00:00 2001 From: Ifeanyi Ubah Date: Thu, 4 Feb 2021 01:36:21 +0100 Subject: [PATCH] Replace warp with hyper for http server impl (#184) In order to help trim down dependencies mostly. Warp is a wrapper for hyper with fancy things like routes which we don't need and it brings it a bit of libraries of its own. Co-authored-by: Mark Mandel --- Cargo.toml | 4 +-- src/proxy/metrics.rs | 83 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 68 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 10ef7f65f2..b3f714e4c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ backoff = "0.2.1" bytes = "1.0.1" clap = "2.33.0" humantime-serde = "1.0.0" +hyper = "0.14.2" parking_lot = "0.11.0" prometheus = { version = "0.9", default-features = false } prost = "0.7.0" @@ -35,10 +36,9 @@ serde_json = "1.0.60" slog = "2.5.2" slog-async = "2.4.0" slog-json = "2.3.0" -tokio = { version = "1.0.2", features = ["full", "test-util", "parking_lot"] } +tokio = { version = "1.1.0", features = ["rt-multi-thread", "signal", "test-util", "parking_lot"] } slog-term = "2.5.0" rand = "0.7.3" -warp = "0.3.0" base64 = "0.12.3" futures-intrusive = "0.4.0" base64-serde = "0.5.1" diff --git a/src/proxy/metrics.rs b/src/proxy/metrics.rs index 3a4524d22f..c7b9d136ea 100644 --- a/src/proxy/metrics.rs +++ b/src/proxy/metrics.rs @@ -15,11 +15,13 @@ */ use crate::proxy::sessions::metrics::Metrics as SessionMetrics; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Method, Response, Server as HyperServer, StatusCode}; use prometheus::{Encoder, Registry, Result as MetricsResult, TextEncoder}; -use slog::{info, warn, Logger}; +use slog::{error, info, warn, Logger}; +use std::convert::Infallible; use std::net::SocketAddr; use tokio::sync::watch::Receiver; -use warp::Filter as WarpFilter; /// Metrics contains metrics configuration for the server. #[derive(Clone)] @@ -40,25 +42,72 @@ pub fn start_metrics_server( ) { info!(log, "starting metrics endpoint at {}", addr.to_string()); - let metrics_route = warp::path!("metrics").map(move || { - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - encoder - .encode(®istry.gather(), &mut buffer) - .map_err(|err| warn!(log, "failed to encode metrics: {:?}", err)) - .and_then(|_| { - String::from_utf8(buffer).map_err(|err| { - warn!(log, "failed to convert metrics to utf8: {:?}", err); - }) - }) - .unwrap_or_else(|_| "# failed to gather metrics".to_string()) + let handler_log = log.clone(); + let make_svc = make_service_fn(move |_conn| { + let registry = registry.clone(); + let handler_log = handler_log.clone(); + async move { + let registry = registry.clone(); + let handler_log = handler_log.clone(); + Ok::<_, Infallible>(service_fn(move |req| { + let registry = registry.clone(); + let handler_log = handler_log.clone(); + async move { + Ok::<_, Infallible>(handle_request( + handler_log, + req.method(), + req.uri().path(), + registry, + )) + } + })) + } }); - let (_, server) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async move { - let _ = shutdown_rx.changed().await; + let server = HyperServer::bind(&addr) + .serve(make_svc) + .with_graceful_shutdown(async move { + shutdown_rx.changed().await.ok(); + }); + + tokio::spawn(async move { + if let Err(err) = server.await { + error!(log, "metrics server exited with an error: {}", err); + } }); +} + +fn handle_request(log: Logger, method: &Method, path: &str, registry: Registry) -> Response { + let mut response = Response::new(Body::empty()); + + match (method, path) { + (&Method::GET, "/metrics") => { + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let body = encoder + .encode(®istry.gather(), &mut buffer) + .map_err(|err| warn!(log, "failed to encode metrics: {:?}", err)) + .and_then(|_| { + String::from_utf8(buffer) + .map(Body::from) + .map_err(|err| warn!(log, "failed to convert metrics to utf8: {:?}", err)) + }); + + match body { + Ok(body) => { + *response.body_mut() = body; + } + Err(_) => { + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + } + } + } + _ => { + *response.status_mut() = StatusCode::NOT_FOUND; + } + }; - tokio::spawn(server); + response } impl Default for Metrics {