Skip to content

Commit

Permalink
Refactor server start up so caller can supply their own tokio runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
locka99 committed Oct 31, 2021
1 parent 141cab9 commit 4d6111d
Showing 1 changed file with 113 additions and 93 deletions.
206 changes: 113 additions & 93 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{

use tokio::{
self,
net::{TcpListener, TcpStream},
net::{TcpListener, TcpStream, ToSocketAddrs},
sync::oneshot::{self, Sender},
time::{interval_at, Duration, Instant},
};
Expand Down Expand Up @@ -223,8 +223,47 @@ impl Server {
/// Runs the supplied server and blocks until it completes either by aborting or
/// by error.
pub fn run_server(server: Arc<RwLock<Server>>) {
let single_threaded_executor = {
let server = trace_read_lock_unwrap!(server);
let server_state = trace_read_lock_unwrap!(server.server_state);
let config = trace_read_lock_unwrap!(server_state.config);
config.performance.single_threaded_executor
};
let server_task = Self::new_server_task(server);
// Launch
let mut builder = if !single_threaded_executor {
tokio::runtime::Builder::new_multi_thread()
} else {
tokio::runtime::Builder::new_current_thread()
};
let runtime = builder.enable_all().build().unwrap();
Self::run_server_on_runtime(runtime, server_task, true);
}

/// Allow the server to be run on a caller supplied runtime. If block is set, the task
/// runs to completion, otherwise, the task runs and a join handle is returned by this fn.
pub fn run_server_on_runtime<F>(
runtime: tokio::runtime::Runtime,
server_task: F,
block: bool,
) -> Option<tokio::task::JoinHandle<<F as futures::Future>::Output>>
where
F: std::future::Future + Send + 'static,
F::Output: Send + 'static,
{
if block {
runtime.block_on(server_task);
info!("Server has finished");
None
} else {
Some(runtime.spawn(server_task))
}
}

/// Returns the main server task - the loop that waits for connections and processes them.
pub async fn new_server_task(server: Arc<RwLock<Server>>) {
// Get the address and discovery url
let (sock_addr, discovery_server_url, single_threaded_executor) = {
let (sock_addr, discovery_server_url) = {
let server = trace_read_lock_unwrap!(server);

// Debug endpoints
Expand All @@ -246,112 +285,93 @@ impl Server {
None
};

(
sock_addr,
discovery_server_url,
config.performance.single_threaded_executor,
)
(sock_addr, discovery_server_url)
};

if sock_addr.is_none() {
error!("Cannot resolve server address, check configuration of server");
return;
match sock_addr {
None => {
error!("Cannot resolve server address, check configuration of server");
}
Some(sock_addr) => Self::server_task(server, sock_addr, discovery_server_url).await,
}
let sock_addr = sock_addr.unwrap();

// These are going to be used to abort the thread via the completion_pact
}

async fn server_task<A: ToSocketAddrs>(
server: Arc<RwLock<Server>>,
sock_addr: A,
discovery_server_url: Option<String>,
) {
// This is returned as the main server task
info!("Waiting for Connection");
// This is the main tokio task
let main_server_task = async {
// Listen for connections (or abort)
let listener = match TcpListener::bind(&sock_addr).await {
Ok(listener) => listener,
Err(err) => {
panic!("Could not bind to socket {:?}", err)
}
};
// Listen for connections (or abort)
let listener = match TcpListener::bind(&sock_addr).await {
Ok(listener) => listener,
Err(err) => {
panic!("Could not bind to socket {:?}", err)
}
};

let (tx_abort, rx_abort) = oneshot::channel();
let (tx_abort, rx_abort) = oneshot::channel();

// Put the server into a running state
// Put the server into a running state
{
let mut server = trace_write_lock_unwrap!(server);
// Running
{
let mut server = trace_write_lock_unwrap!(server);
// Running
{
let mut server_state = trace_write_lock_unwrap!(server.server_state);
server_state.start_time = DateTime::now();
server_state.set_state(ServerStateType::Running);
}

// Start a timer that registers the server with a discovery server
if let Some(ref discovery_server_url) = discovery_server_url {
server.start_discovery_server_registration_timer(discovery_server_url);
} else {
info!(
"Server has not set a discovery server url, so no registration will happen"
);
}
let mut server_state = trace_write_lock_unwrap!(server.server_state);
server_state.start_time = DateTime::now();
server_state.set_state(ServerStateType::Running);
}

// Start any pending polling action timers
server.start_pending_polling_actions();
// Start a timer that registers the server with a discovery server
if let Some(ref discovery_server_url) = discovery_server_url {
server.start_discovery_server_registration_timer(discovery_server_url);
} else {
info!("Server has not set a discovery server url, so no registration will happen");
}

// Start a server abort task loop
Self::start_abort_poll(server.clone(), tx_abort);

// This isn't nice syntax, but basically there are two async actions
// going on, one of which has to complete - either the listener breaks out of its
// loop, or the rx_abort receives an abort message.
tokio::select! {
_ = async {
loop {
match listener.accept().await {
Ok((socket, _addr)) => {
// Clear out dead sessions
info!("Handling new connection {:?}", socket);
// Check for abort
let mut server = trace_write_lock_unwrap!(server);
let is_abort = {
let server_state = trace_read_lock_unwrap!(server.server_state);
server_state.is_abort()
};
if is_abort {
info!("Server is aborting so it will not accept new connections");
break;
} else {
server.handle_connection(socket);
}
}
Err(e) => {
error!("couldn't accept connection to client: {:?}", e);
// Start any pending polling action timers
server.start_pending_polling_actions();
}

// Start a server abort task loop
Self::start_abort_poll(server.clone(), tx_abort);

// This isn't nice syntax, but basically there are two async actions
// going on, one of which has to complete - either the listener breaks out of its
// loop, or the rx_abort receives an abort message.
tokio::select! {
_ = async {
loop {
match listener.accept().await {
Ok((socket, _addr)) => {
// Clear out dead sessions
info!("Handling new connection {:?}", socket);
// Check for abort
let mut server = trace_write_lock_unwrap!(server);
let is_abort = {
let server_state = trace_read_lock_unwrap!(server.server_state);
server_state.is_abort()
};
if is_abort {
info!("Server is aborting so it will not accept new connections");
break;
} else {
server.handle_connection(socket);
}
}
Err(e) => {
error!("couldn't accept connection to client: {:?}", e);
}
}
// Help the rust type inferencer out
Ok::<_, tokio::io::Error>(())
} => {}
_ = rx_abort => {
info!("abort received");
}
// Help the rust type inferencer out
Ok::<_, tokio::io::Error>(())
} => {}
_ = rx_abort => {
info!("abort received");
}
info!("main server task is finished");
};

// Launch
let mut builder = if !single_threaded_executor {
tokio::runtime::Builder::new_multi_thread()
} else {
tokio::runtime::Builder::new_current_thread()
};

builder
.enable_all()
.build()
.unwrap()
.block_on(main_server_task);

info!("Server has finished");
}
info!("main server task is finished");
}

/// Returns the current [`ServerState`] for the server.
Expand Down

0 comments on commit 4d6111d

Please sign in to comment.