Skip to content

Commit

Permalink
Replace oneshot shutdown signal with watch channel (#129)
Browse files Browse the repository at this point in the history
We need a single shutdown signal and oneshot doesn't allow
this so we use a watch channel instead.

Co-authored-by: Mark Mandel <markmandel@google.com>
  • Loading branch information
iffyio and markmandel authored Nov 11, 2020
1 parent d67fae7 commit 9464493
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 28 deletions.
15 changes: 10 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use prometheus::Registry;
use quilkin::config::Config;
use quilkin::proxy::{logger, Builder, Metrics};
use tokio::signal;
use tokio::sync::oneshot;
use tokio::sync::watch;

const VERSION: &str = env!("CARGO_PKG_VERSION");

Expand Down Expand Up @@ -59,12 +59,17 @@ async fn main() {
.unwrap()
.build();

let (close, stop) = oneshot::channel::<()>();
let (shutdown_tx, mut shutdown_rx) = watch::channel::<()>(());
// Remove the init value from the channel - ensuring that the channel is
// empty so that we can terminate once we receive any value from it.
shutdown_rx.recv().await;
tokio::spawn(async move {
signal::ctrl_c().await.unwrap();
close.send(()).unwrap();
// Don't unwrap in order to ensure that we execute
// any subsequent shutdown tasks.
signal::ctrl_c().await.ok();
shutdown_tx.broadcast(()).ok();
});

server.run(stop).await.unwrap();
server.run(shutdown_rx).await.unwrap();
info!(log, "Shutting down");
}
8 changes: 4 additions & 4 deletions src/proxy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::proxy::sessions::metrics::Metrics as SessionMetrics;
use prometheus::{Encoder, Registry, Result as MetricsResult, TextEncoder};
use slog::{info, warn, Logger};
use std::net::SocketAddr;
use tokio::sync::oneshot::Receiver;
use tokio::sync::watch::Receiver;
use warp::Filter as WarpFilter;

/// Metrics contains metrics configuration for the server.
Expand All @@ -19,7 +19,7 @@ pub struct Metrics {
pub fn start_metrics_server(
addr: SocketAddr,
registry: Registry,
shutdown_signal: Receiver<()>,
mut shutdown_rx: Receiver<()>,
log: Logger,
) {
info!(log, "starting metrics endpoint at {}", addr.to_string());
Expand All @@ -38,8 +38,8 @@ pub fn start_metrics_server(
.unwrap_or_else(|_| "# failed to gather metrics".to_string())
});

let (_, server) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async {
shutdown_signal.await.ok();
let (_, server) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async move {
let _ = shutdown_rx.recv().await;
});

tokio::spawn(server);
Expand Down
23 changes: 7 additions & 16 deletions src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

use std::collections::HashMap;
use std::io::{Error as IOError, ErrorKind};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::str::from_utf8;
use std::sync::Arc;
Expand All @@ -24,7 +23,7 @@ use slog::{debug, error, info, warn, Logger};
use tokio::io::Result;
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, watch};
use tokio::sync::{Mutex, RwLock};
use tokio::time::{delay_for, Duration, Instant};

Expand All @@ -48,20 +47,17 @@ pub struct Server {
impl Server {
/// start the async processing of incoming UDP packets. Will block until an
/// event is sent through the stop Receiver.
pub async fn run(self, stop: oneshot::Receiver<()>) -> Result<()> {
pub async fn run(self, mut shutdown_rx: watch::Receiver<()>) -> Result<()> {
self.log_config();

// Start metrics server if needed - it is shutdown before exiting the function.
let metrics_shutdown_tx = self.metrics.addr.map(|addr| {
let (metrics_shutdown_tx, metrics_shutdown_rx) = oneshot::channel();
if let Some(addr) = self.metrics.addr {
start_metrics_server(
addr,
self.metrics.registry.clone(),
metrics_shutdown_rx,
shutdown_rx.clone(),
self.log.clone(),
);
metrics_shutdown_tx
});
}

let (receive_socket, send_socket) = Server::bind(&self.config).await?.split();
// HashMap key is from,destination addresses as a tuple.
Expand All @@ -78,13 +74,8 @@ impl Server {
send_packets,
);

// convert to an IO error
let result = stop
.await
.map_err(|err| IOError::new(ErrorKind::BrokenPipe, err));

metrics_shutdown_tx.map(|tx| tx.send(()).ok());
result
let _ = shutdown_rx.recv().await;
Ok(())
}

/// run_prune_sessions starts the timer for pruning sessions and runs prune_sessions every
Expand Down
9 changes: 6 additions & 3 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub struct TestHelper {
pub log: Logger,
/// Channel to subscribe to, and trigger the shutdown of created resources.
shutdown_ch: Option<(watch::Sender<()>, watch::Receiver<()>)>,
server_shutdown_tx: Vec<Option<oneshot::Sender<()>>>,
server_shutdown_tx: Vec<Option<watch::Sender<()>>>,
}

/// Returned from [creating a socket](TestHelper::open_socket_and_recv_single_packet)
Expand Down Expand Up @@ -132,7 +132,7 @@ impl Drop for TestHelper {
.flatten()
{
shutdown_tx
.send(())
.broadcast(())
.map_err(|err| {
warn!(
log,
Expand Down Expand Up @@ -288,9 +288,12 @@ impl TestHelper {
filter_registry: FilterRegistry,
metrics: Metrics,
) {
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (shutdown_tx, mut shutdown_rx) = watch::channel::<()>(());
self.server_shutdown_tx.push(Some(shutdown_tx));
tokio::spawn(async move {
// Remove the init value from the channel - ensuring that the channel is
// empty so that we can terminate once we receive any value from it.
let _ = shutdown_rx.recv().await;
Builder::from(Arc::new(config))
.with_filter_registry(filter_registry)
.with_metrics(metrics)
Expand Down

0 comments on commit 9464493

Please sign in to comment.