diff --git a/server/src/server.rs b/server/src/server.rs index e23b4683e..46927ee72 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -12,7 +12,7 @@ use std::{ use tokio::{ self, - net::{TcpListener, TcpStream}, + net::{TcpListener, TcpStream, ToSocketAddrs}, sync::oneshot::{self, Sender}, time::{interval_at, Duration, Instant}, }; @@ -223,8 +223,47 @@ impl Server { /// Runs the supplied server and blocks until it completes either by aborting or /// by error. pub fn run_server(server: Arc>) { + let single_threaded_executor = { + let server = trace_read_lock_unwrap!(server); + let server_state = trace_read_lock_unwrap!(server.server_state); + let config = trace_read_lock_unwrap!(server_state.config); + config.performance.single_threaded_executor + }; + let server_task = Self::new_server_task(server); + // Launch + let mut builder = if !single_threaded_executor { + tokio::runtime::Builder::new_multi_thread() + } else { + tokio::runtime::Builder::new_current_thread() + }; + let runtime = builder.enable_all().build().unwrap(); + Self::run_server_on_runtime(runtime, server_task, true); + } + + /// Allow the server to be run on a caller supplied runtime. If block is set, the task + /// runs to completion, otherwise, the task runs and a join handle is returned by this fn. + pub fn run_server_on_runtime( + runtime: tokio::runtime::Runtime, + server_task: F, + block: bool, + ) -> Option::Output>> + where + F: std::future::Future + Send + 'static, + F::Output: Send + 'static, + { + if block { + runtime.block_on(server_task); + info!("Server has finished"); + None + } else { + Some(runtime.spawn(server_task)) + } + } + + /// Returns the main server task - the loop that waits for connections and processes them. + pub async fn new_server_task(server: Arc>) { // Get the address and discovery url - let (sock_addr, discovery_server_url, single_threaded_executor) = { + let (sock_addr, discovery_server_url) = { let server = trace_read_lock_unwrap!(server); // Debug endpoints @@ -246,112 +285,93 @@ impl Server { None }; - ( - sock_addr, - discovery_server_url, - config.performance.single_threaded_executor, - ) + (sock_addr, discovery_server_url) }; - - if sock_addr.is_none() { - error!("Cannot resolve server address, check configuration of server"); - return; + match sock_addr { + None => { + error!("Cannot resolve server address, check configuration of server"); + } + Some(sock_addr) => Self::server_task(server, sock_addr, discovery_server_url).await, } - let sock_addr = sock_addr.unwrap(); - - // These are going to be used to abort the thread via the completion_pact + } + async fn server_task( + server: Arc>, + sock_addr: A, + discovery_server_url: Option, + ) { + // This is returned as the main server task info!("Waiting for Connection"); - // This is the main tokio task - let main_server_task = async { - // Listen for connections (or abort) - let listener = match TcpListener::bind(&sock_addr).await { - Ok(listener) => listener, - Err(err) => { - panic!("Could not bind to socket {:?}", err) - } - }; + // Listen for connections (or abort) + let listener = match TcpListener::bind(&sock_addr).await { + Ok(listener) => listener, + Err(err) => { + panic!("Could not bind to socket {:?}", err) + } + }; - let (tx_abort, rx_abort) = oneshot::channel(); + let (tx_abort, rx_abort) = oneshot::channel(); - // Put the server into a running state + // Put the server into a running state + { + let mut server = trace_write_lock_unwrap!(server); + // Running { - let mut server = trace_write_lock_unwrap!(server); - // Running - { - let mut server_state = trace_write_lock_unwrap!(server.server_state); - server_state.start_time = DateTime::now(); - server_state.set_state(ServerStateType::Running); - } - - // Start a timer that registers the server with a discovery server - if let Some(ref discovery_server_url) = discovery_server_url { - server.start_discovery_server_registration_timer(discovery_server_url); - } else { - info!( - "Server has not set a discovery server url, so no registration will happen" - ); - } + let mut server_state = trace_write_lock_unwrap!(server.server_state); + server_state.start_time = DateTime::now(); + server_state.set_state(ServerStateType::Running); + } - // Start any pending polling action timers - server.start_pending_polling_actions(); + // Start a timer that registers the server with a discovery server + if let Some(ref discovery_server_url) = discovery_server_url { + server.start_discovery_server_registration_timer(discovery_server_url); + } else { + info!("Server has not set a discovery server url, so no registration will happen"); } - // Start a server abort task loop - Self::start_abort_poll(server.clone(), tx_abort); - - // This isn't nice syntax, but basically there are two async actions - // going on, one of which has to complete - either the listener breaks out of its - // loop, or the rx_abort receives an abort message. - tokio::select! { - _ = async { - loop { - match listener.accept().await { - Ok((socket, _addr)) => { - // Clear out dead sessions - info!("Handling new connection {:?}", socket); - // Check for abort - let mut server = trace_write_lock_unwrap!(server); - let is_abort = { - let server_state = trace_read_lock_unwrap!(server.server_state); - server_state.is_abort() - }; - if is_abort { - info!("Server is aborting so it will not accept new connections"); - break; - } else { - server.handle_connection(socket); - } - } - Err(e) => { - error!("couldn't accept connection to client: {:?}", e); + // Start any pending polling action timers + server.start_pending_polling_actions(); + } + + // Start a server abort task loop + Self::start_abort_poll(server.clone(), tx_abort); + + // This isn't nice syntax, but basically there are two async actions + // going on, one of which has to complete - either the listener breaks out of its + // loop, or the rx_abort receives an abort message. + tokio::select! { + _ = async { + loop { + match listener.accept().await { + Ok((socket, _addr)) => { + // Clear out dead sessions + info!("Handling new connection {:?}", socket); + // Check for abort + let mut server = trace_write_lock_unwrap!(server); + let is_abort = { + let server_state = trace_read_lock_unwrap!(server.server_state); + server_state.is_abort() + }; + if is_abort { + info!("Server is aborting so it will not accept new connections"); + break; + } else { + server.handle_connection(socket); } } + Err(e) => { + error!("couldn't accept connection to client: {:?}", e); + } } - // Help the rust type inferencer out - Ok::<_, tokio::io::Error>(()) - } => {} - _ = rx_abort => { - info!("abort received"); } + // Help the rust type inferencer out + Ok::<_, tokio::io::Error>(()) + } => {} + _ = rx_abort => { + info!("abort received"); } - info!("main server task is finished"); - }; - - // Launch - let mut builder = if !single_threaded_executor { - tokio::runtime::Builder::new_multi_thread() - } else { - tokio::runtime::Builder::new_current_thread() - }; - - builder - .enable_all() - .build() - .unwrap() - .block_on(main_server_task); - - info!("Server has finished"); + } + info!("main server task is finished"); } /// Returns the current [`ServerState`] for the server.