From 8674155c56e33b7b553694da7f0b0b97f45f348c Mon Sep 17 00:00:00 2001 From: James Nugent Date: Thu, 26 Mar 2020 18:21:39 -0500 Subject: [PATCH] feat(health): Add tonic-health server impl This commit adds a new crate `tonic-health` which implements the [standard GRPC Health Checking][checking] protocol. Currently there is only a server implementation, though others have alluded in the discussion in #135 that client implementations exist which could also be imported as necessary. A example server has also been added - once the client work is done a client for this should be added also. [checking]: https://github.com/grpc/grpc/blob/master/doc/health-checking.md --- Cargo.toml | 3 + README.md | 4 + examples/Cargo.toml | 6 + examples/README.md | 7 + examples/src/health/server.rs | 68 ++++++++ tonic-health/Cargo.toml | 30 ++++ tonic-health/build.rs | 9 + tonic-health/proto/health.proto | 22 +++ tonic-health/src/lib.rs | 35 ++++ tonic-health/src/server.rs | 294 ++++++++++++++++++++++++++++++++ 10 files changed, 478 insertions(+) create mode 100644 examples/src/health/server.rs create mode 100644 tonic-health/Cargo.toml create mode 100644 tonic-health/build.rs create mode 100644 tonic-health/proto/health.proto create mode 100644 tonic-health/src/lib.rs create mode 100644 tonic-health/src/server.rs diff --git a/Cargo.toml b/Cargo.toml index 478564923..6d2ffea84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,9 +2,12 @@ members = [ "tonic", "tonic-build", + "tonic-health", + # Non-published crates "examples", "interop", + # Tests "tests/included_service", "tests/same_name", diff --git a/README.md b/README.md index 8ae4f2455..5d1018ac8 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ contains the tools to build clients and servers from [`protobuf`] definitions. - Load balancing - Custom metadata - Authentication +- Health Checking ## Getting Started @@ -67,6 +68,8 @@ question. If that doesn't work, try opening an [issue] with the question. - [`tonic`](https://github.com/hyperium/tonic/tree/master/tonic): Generic gRPC and HTTP/2 client/server implementation. - [`tonic-build`](https://github.com/hyperium/tonic/tree/master/tonic-build): [`prost`] based service codegen. +- [`tonic-health`](https://github.com/hyperium/tonic/tree/master/tonic-health): Implementation of the standard [gRPC +health checking service][healthcheck]. Also serves as an example of both unary and response streaming. - [`examples`](https://github.com/hyperium/tonic/tree/master/examples): Example gRPC implementations showing off tls, load balancing and bi-directional streaming. - [`interop`](https://github.com/hyperium/tonic/tree/master/interop): Interop tests implementation. @@ -105,3 +108,4 @@ terms or conditions. [Chat]: https://discord.gg/6yGkFeN [routeguide-tutorial]: https://github.com/hyperium/tonic/blob/master/examples/routeguide-tutorial.md [helloworld-tutorial]: https://github.com/hyperium/tonic/blob/master/examples/helloworld-tutorial.md +[healthcheck]: https://github.com/grpc/grpc/blob/master/doc/health-checking.md diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 1e2fddb2f..58148846a 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -102,6 +102,10 @@ path = "src/hyper_warp/client.rs" name = "hyper-warp-server" path = "src/hyper_warp/server.rs" +[[bin]] +name = "health-server" +path = "src/health/server.rs" + [dependencies] tonic = { path = "../tonic", features = ["tls", "data-prost"] } prost = "0.6" @@ -126,6 +130,8 @@ warp = { version = "0.2", default-features = false } http = "0.2" http-body = "0.3" pin-project = "0.4" +# Health example +tonic-health = { path = "../tonic-health" } [build-dependencies] tonic-build = { path = "../tonic-build", features = ["prost"] } diff --git a/examples/README.md b/examples/README.md index de76c8578..d1217c7ea 100644 --- a/examples/README.md +++ b/examples/README.md @@ -72,6 +72,13 @@ $ cargo run --bin tls-client $ cargo run --bin tls-server ``` +## Health Checking + +### Server + +```bash +$ cargo run --bin health-server +``` ### Notes: diff --git a/examples/src/health/server.rs b/examples/src/health/server.rs new file mode 100644 index 000000000..625272dd6 --- /dev/null +++ b/examples/src/health/server.rs @@ -0,0 +1,68 @@ +use tonic::{transport::Server, Request, Response, Status}; + +use hello_world::greeter_server::{Greeter, GreeterServer}; +use hello_world::{HelloReply, HelloRequest}; +use std::time::Duration; +use tokio::time::delay_for; +use tonic_health::server::HealthReporter; + +pub mod hello_world { + tonic::include_proto!("helloworld"); +} + +#[derive(Default)] +pub struct MyGreeter {} + +#[tonic::async_trait] +impl Greeter for MyGreeter { + async fn say_hello( + &self, + request: Request, + ) -> Result, Status> { + println!("Got a request from {:?}", request.remote_addr()); + + let reply = hello_world::HelloReply { + message: format!("Hello {}!", request.into_inner().name), + }; + Ok(Response::new(reply)) + } +} + +/// This function (somewhat improbably) flips the status of a service every second, in order +/// that the effect of `tonic_health::HealthReporter::watch` can be easily observed. +async fn twiddle_service_status(mut reporter: HealthReporter) { + let mut iter = 0u64; + loop { + iter += 1; + delay_for(Duration::from_secs(1)).await; + + if iter % 2 == 0 { + reporter.set_serving::>().await; + } else { + reporter.set_not_serving::>().await; + }; + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); + health_reporter + .set_serving::>() + .await; + + tokio::spawn(twiddle_service_status(health_reporter.clone())); + + let addr = "[::1]:50051".parse().unwrap(); + let greeter = MyGreeter::default(); + + println!("HealthServer + GreeterServer listening on {}", addr); + + Server::builder() + .add_service(health_service) + .add_service(GreeterServer::new(greeter)) + .serve(addr) + .await?; + + Ok(()) +} diff --git a/tonic-health/Cargo.toml b/tonic-health/Cargo.toml new file mode 100644 index 000000000..d17b80d0d --- /dev/null +++ b/tonic-health/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "tonic-health" +version = "0.1.0" +authors = ["James Nugent "] +edition = "2018" +license = "MIT" +repository = "https://github.com/hyperium/tonic" +homepage = "https://github.com/hyperium/tonic" +description = """ +Health Checking module of `tonic` gRPC implementation. +""" +readme = "README.md" +categories = ["network-programming", "asynchronous"] +keywords = ["rpc", "grpc", "async", "healthcheck"] + +[features] +transport = ["tonic/transport"] + +[dependencies] +async-stream = "0.2" +tokio = { version = "0.2", features = ["sync", "stream"] } +tonic = { path = "../tonic", features = ["codegen", "data-prost"] } +bytes = "0.5" +prost = "0.6" + +[dev-dependencies] +tokio = { version = "0.2", features = ["rt-core", "macros"]} + +[build-dependencies] +tonic-build = { path = "../tonic-build" } \ No newline at end of file diff --git a/tonic-health/build.rs b/tonic-health/build.rs new file mode 100644 index 000000000..94758f147 --- /dev/null +++ b/tonic-health/build.rs @@ -0,0 +1,9 @@ +fn main() -> Result<(), Box> { + tonic_build::configure() + .build_server(true) + .build_client(false) + .format(true) + .compile(&["proto/health.proto"], &["proto/"])?; + + Ok(()) +} diff --git a/tonic-health/proto/health.proto b/tonic-health/proto/health.proto new file mode 100644 index 000000000..871b3d71b --- /dev/null +++ b/tonic-health/proto/health.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package grpc.health.v1; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + } + ServingStatus status = 1; +} + +service Health { + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); + + rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); +} \ No newline at end of file diff --git a/tonic-health/src/lib.rs b/tonic-health/src/lib.rs new file mode 100644 index 000000000..f3e3fa9f8 --- /dev/null +++ b/tonic-health/src/lib.rs @@ -0,0 +1,35 @@ +use std::fmt::{Display, Formatter}; + +mod proto { + tonic::include_proto!("grpc.health.v1"); +} + +pub mod server; + +/// An enumeration of values representing gRPC service health. +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum ServingStatus { + Unknown, + Serving, + NotServing, +} + +impl Display for ServingStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + ServingStatus::Unknown => f.write_str("Unknown"), + ServingStatus::Serving => f.write_str("Serving"), + ServingStatus::NotServing => f.write_str("NotServing"), + } + } +} + +impl From for proto::health_check_response::ServingStatus { + fn from(s: ServingStatus) -> Self { + match s { + ServingStatus::Unknown => proto::health_check_response::ServingStatus::Unknown, + ServingStatus::Serving => proto::health_check_response::ServingStatus::Serving, + ServingStatus::NotServing => proto::health_check_response::ServingStatus::NotServing, + } + } +} diff --git a/tonic-health/src/server.rs b/tonic-health/src/server.rs new file mode 100644 index 000000000..ecd6170e3 --- /dev/null +++ b/tonic-health/src/server.rs @@ -0,0 +1,294 @@ +use crate::proto::health_server::{Health, HealthServer}; +use crate::proto::{HealthCheckRequest, HealthCheckResponse}; +use crate::ServingStatus; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; +use tokio::stream::Stream; +use tokio::sync::{watch, RwLock}; +#[cfg(feature = "transport")] +use tonic::transport::NamedService; +use tonic::{Request, Response, Status}; + +/// Creates a `HealthReporter` and a linked `HealthServer` pair. Together, +/// these types can be used to serve the gRPC Health Checking service. +/// +/// A `HealthReporter` is used to update the state of gRPC services. +/// +/// A `HealthServer` is a Tonic gRPC server for the `grpc.health.v1.Health`, +/// which can be added to a Tonic runtime using `add_service` on the runtime +/// builder. +pub fn health_reporter() -> (HealthReporter, HealthServer) { + let reporter = HealthReporter::new(); + let service = HealthService::new(reporter.statuses.clone()); + let server = HealthServer::new(service); + + (reporter, server) +} + +type StatusPair = (watch::Sender, watch::Receiver); + +/// A handle providing methods to update the health status of gRPC services. A +/// `HealthReporter` is connected to a `HealthServer` which serves the statuses +/// over the `grpc.health.v1.Health` service. +#[derive(Clone, Debug)] +pub struct HealthReporter { + statuses: Arc>>, +} + +impl HealthReporter { + fn new() -> Self { + let statuses = Arc::new(RwLock::new(HashMap::new())); + + HealthReporter { statuses } + } + + /// Sets the status of the service implemented by `S` to `Serving`. This notifies any watchers + /// if there is a change in status. + #[cfg(feature = "transport")] + pub async fn set_serving(&mut self) + where + S: NamedService, + { + let service_name = ::NAME; + self.set_service_status(service_name, ServingStatus::Serving) + .await; + } + + /// Sets the status of the service implemented by `S` to `NotServing`. This notifies any watchers + /// if there is a change in status. + #[cfg(feature = "transport")] + pub async fn set_not_serving(&mut self) + where + S: NamedService, + { + let service_name = ::NAME; + self.set_service_status(service_name, ServingStatus::NotServing) + .await; + } + + /// Sets the status of the service with `service_name` to `status`. This notifies any watchers + /// if there is a change in status. + pub async fn set_service_status(&mut self, service_name: S, status: ServingStatus) + where + S: AsRef, + { + let service_name = service_name.as_ref(); + let mut writer = self.statuses.write().await; + match writer.get(service_name) { + None => { + let _ = writer.insert(service_name.to_string(), watch::channel(status)); + } + Some((tx, rx)) => { + let mut rx = rx.clone(); + if rx.recv().await == Some(status) { + return; + } + + // We only ever hand out clones of the receiver, so the originally-created + // receiver should always be present, only being dropped when clearing the + // service status. Consequently, `tx.broadcast` should not fail, making use + // of `expect` here safe. + tx.broadcast(status).expect("channel should not be closed"); + } + }; + } + + /// Clear the status of the given service. + pub async fn clear_service_status(&mut self, service_name: &str) { + let mut writer = self.statuses.write().await; + let _ = writer.remove(service_name); + } +} + +struct HealthService { + statuses: Arc>>, +} + +impl HealthService { + fn new(services: Arc>>) -> Self { + HealthService { statuses: services } + } + + async fn service_health(&self, service_name: &str) -> Option { + let reader = self.statuses.read().await; + match reader.get(service_name).map(|p| p.1.clone()) { + None => None, + Some(mut receiver) => receiver.recv().await, + } + } +} + +#[tonic::async_trait] +impl Health for HealthService { + async fn check( + &self, + request: Request, + ) -> Result, Status> { + let service_name = request.get_ref().service.as_str(); + let status = self.service_health(service_name).await; + + match status { + None => Err(Status::not_found("service not registered")), + Some(status) => Ok(Response::new(HealthCheckResponse { + status: crate::proto::health_check_response::ServingStatus::from(status) as i32, + })), + } + } + + type WatchStream = + Pin> + Send + Sync + 'static>>; + + async fn watch( + &self, + request: Request, + ) -> Result, Status> { + let service_name = request.get_ref().service.as_str(); + let mut status_rx = match self.statuses.read().await.get(service_name) { + None => return Err(Status::not_found("service not registered")), + Some(pair) => pair.1.clone(), + }; + + let output = async_stream::try_stream! { + while let Some(status) = status_rx.recv().await { + yield HealthCheckResponse{ + status: crate::proto::health_check_response::ServingStatus::from(status) as i32, + }; + } + }; + + Ok(Response::new(Box::pin(output) as Self::WatchStream)) + } +} + +#[cfg(test)] +mod tests { + use crate::proto::health_server::Health; + use crate::proto::HealthCheckRequest; + use crate::server::{HealthReporter, HealthService}; + use crate::ServingStatus; + use std::collections::HashMap; + use std::sync::Arc; + use tokio::stream::StreamExt; + use tokio::sync::{watch, RwLock}; + use tonic::{Code, Request, Status}; + + fn assert_serving_status(wire: i32, expected: ServingStatus) { + let expected = crate::proto::health_check_response::ServingStatus::from(expected) as i32; + assert_eq!(wire, expected); + } + + fn assert_grpc_status(wire: Option, expected: Code) { + let wire = wire.expect("status is not None").code(); + assert_eq!(wire, expected); + } + + async fn make_test_service() -> (HealthReporter, HealthService) { + let state = Arc::new(RwLock::new(HashMap::new())); + state.write().await.insert( + "TestService".to_string(), + watch::channel(ServingStatus::Unknown), + ); + ( + HealthReporter { + statuses: state.clone(), + }, + HealthService::new(state.clone()), + ) + } + + #[tokio::test] + async fn test_service_check() { + let (mut reporter, service) = make_test_service().await; + + // Unregistered service + let resp = service + .check(Request::new(HealthCheckRequest { + service: "Unregistered".to_string(), + })) + .await; + assert!(resp.is_err()); + assert_grpc_status(resp.err(), Code::NotFound); + + // Registered service - initial state + let resp = service + .check(Request::new(HealthCheckRequest { + service: "TestService".to_string(), + })) + .await; + assert!(resp.is_ok()); + let resp = resp.unwrap().into_inner(); + assert_serving_status(resp.status, ServingStatus::Unknown); + + // Registered service - updated state + reporter + .set_service_status("TestService", ServingStatus::Serving) + .await; + let resp = service + .check(Request::new(HealthCheckRequest { + service: "TestService".to_string(), + })) + .await; + assert!(resp.is_ok()); + let resp = resp.unwrap().into_inner(); + assert_serving_status(resp.status, ServingStatus::Serving); + } + + #[tokio::test] + async fn test_service_watch() { + let (mut reporter, service) = make_test_service().await; + + // Unregistered service + let resp = service + .watch(Request::new(HealthCheckRequest { + service: "Unregistered".to_string(), + })) + .await; + assert!(resp.is_err()); + assert_grpc_status(resp.err(), Code::NotFound); + + // Registered service + let resp = service + .watch(Request::new(HealthCheckRequest { + service: "TestService".to_string(), + })) + .await; + assert!(resp.is_ok()); + let mut resp = resp.unwrap().into_inner(); + + // Registered service - initial state + let item = resp + .next() + .await + .expect("streamed response is Some") + .expect("response is ok"); + assert_serving_status(item.status, ServingStatus::Unknown); + + // Registered service - updated state + reporter + .set_service_status("TestService", ServingStatus::NotServing) + .await; + let item = resp + .next() + .await + .expect("streamed response is Some") + .expect("response is ok"); + assert_serving_status(item.status, ServingStatus::NotServing); + + // Registered service - updated state + reporter + .set_service_status("TestService", ServingStatus::Serving) + .await; + let item = resp + .next() + .await + .expect("streamed response is Some") + .expect("response is ok"); + assert_serving_status(item.status, ServingStatus::Serving); + + // De-registered service + reporter.clear_service_status("TestService").await; + let item = resp.next().await; + assert!(item.is_none()); + } +}