Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace oneshot shutdown signal with watch channel #129

Merged
merged 2 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use prometheus::Registry;
use quilkin::config::Config;
use quilkin::proxy::{logger, Builder, Metrics};
use tokio::signal;
use tokio::sync::oneshot;
use tokio::sync::watch;

const VERSION: &str = env!("CARGO_PKG_VERSION");

Expand Down Expand Up @@ -59,12 +59,17 @@ async fn main() {
.unwrap()
.build();

let (close, stop) = oneshot::channel::<()>();
let (shutdown_tx, mut shutdown_rx) = watch::channel::<()>(());
// Remove the init value from the channel - ensuring that the channel is
// empty so that we can terminate once we receive any value from it.
shutdown_rx.recv().await;
tokio::spawn(async move {
signal::ctrl_c().await.unwrap();
close.send(()).unwrap();
// Don't unwrap in order to ensure that we execute
// any subsequent shutdown tasks.
signal::ctrl_c().await.ok();
shutdown_tx.broadcast(()).ok();
});

server.run(stop).await.unwrap();
server.run(shutdown_rx).await.unwrap();
info!(log, "Shutting down");
}
8 changes: 4 additions & 4 deletions src/proxy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::proxy::sessions::metrics::Metrics as SessionMetrics;
use prometheus::{Encoder, Registry, Result as MetricsResult, TextEncoder};
use slog::{info, warn, Logger};
use std::net::SocketAddr;
use tokio::sync::oneshot::Receiver;
use tokio::sync::watch::Receiver;
use warp::Filter as WarpFilter;

/// Metrics contains metrics configuration for the server.
Expand All @@ -19,7 +19,7 @@ pub struct Metrics {
pub fn start_metrics_server(
addr: SocketAddr,
registry: Registry,
shutdown_signal: Receiver<()>,
mut shutdown_rx: Receiver<()>,
log: Logger,
) {
info!(log, "starting metrics endpoint at {}", addr.to_string());
Expand All @@ -38,8 +38,8 @@ pub fn start_metrics_server(
.unwrap_or_else(|_| "# failed to gather metrics".to_string())
});

let (_, server) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async {
shutdown_signal.await.ok();
let (_, server) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async move {
let _ = shutdown_rx.recv().await;
});

tokio::spawn(server);
Expand Down
23 changes: 7 additions & 16 deletions src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

use std::collections::HashMap;
use std::io::{Error as IOError, ErrorKind};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::str::from_utf8;
use std::sync::Arc;
Expand All @@ -24,7 +23,7 @@ use slog::{debug, error, info, warn, Logger};
use tokio::io::Result;
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, watch};
use tokio::sync::{Mutex, RwLock};
use tokio::time::{delay_for, Duration, Instant};

Expand All @@ -48,20 +47,17 @@ pub struct Server {
impl Server {
/// start the async processing of incoming UDP packets. Will block until an
/// event is sent through the stop Receiver.
pub async fn run(self, stop: oneshot::Receiver<()>) -> Result<()> {
pub async fn run(self, mut shutdown_rx: watch::Receiver<()>) -> Result<()> {
self.log_config();

// Start metrics server if needed - it is shutdown before exiting the function.
let metrics_shutdown_tx = self.metrics.addr.map(|addr| {
let (metrics_shutdown_tx, metrics_shutdown_rx) = oneshot::channel();
if let Some(addr) = self.metrics.addr {
start_metrics_server(
addr,
self.metrics.registry.clone(),
metrics_shutdown_rx,
shutdown_rx.clone(),
self.log.clone(),
);
metrics_shutdown_tx
});
}

let (receive_socket, send_socket) = Server::bind(&self.config).await?.split();
// HashMap key is from,destination addresses as a tuple.
Expand All @@ -78,13 +74,8 @@ impl Server {
send_packets,
);

// convert to an IO error
let result = stop
.await
.map_err(|err| IOError::new(ErrorKind::BrokenPipe, err));

metrics_shutdown_tx.map(|tx| tx.send(()).ok());
result
let _ = shutdown_rx.recv().await;
Ok(())
}

/// run_prune_sessions starts the timer for pruning sessions and runs prune_sessions every
Expand Down
9 changes: 6 additions & 3 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub struct TestHelper {
pub log: Logger,
/// Channel to subscribe to, and trigger the shutdown of created resources.
shutdown_ch: Option<(watch::Sender<()>, watch::Receiver<()>)>,
server_shutdown_tx: Vec<Option<oneshot::Sender<()>>>,
server_shutdown_tx: Vec<Option<watch::Sender<()>>>,
}

/// Returned from [creating a socket](TestHelper::open_socket_and_recv_single_packet)
Expand Down Expand Up @@ -132,7 +132,7 @@ impl Drop for TestHelper {
.flatten()
{
shutdown_tx
.send(())
.broadcast(())
.map_err(|err| {
warn!(
log,
Expand Down Expand Up @@ -288,9 +288,12 @@ impl TestHelper {
filter_registry: FilterRegistry,
metrics: Metrics,
) {
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (shutdown_tx, mut shutdown_rx) = watch::channel::<()>(());
self.server_shutdown_tx.push(Some(shutdown_tx));
tokio::spawn(async move {
// Remove the init value from the channel - ensuring that the channel is
// empty so that we can terminate once we receive any value from it.
let _ = shutdown_rx.recv().await;
Builder::from(Arc::new(config))
.with_filter_registry(filter_registry)
.with_metrics(metrics)
Expand Down