From c18c5f1215a7be5fdb7b54761141fd7e00c54142 Mon Sep 17 00:00:00 2001 From: Adam Lock Date: Wed, 4 Aug 2021 14:51:01 +0100 Subject: [PATCH] Rollback some changes --- client/src/comms/tcp_transport.rs | 122 +++++++++++++++--------------- client/src/session.rs | 40 +++++----- 2 files changed, 82 insertions(+), 80 deletions(-) diff --git a/client/src/comms/tcp_transport.rs b/client/src/comms/tcp_transport.rs index be9d5edba..76746f3b1 100644 --- a/client/src/comms/tcp_transport.rs +++ b/client/src/comms/tcp_transport.rs @@ -11,7 +11,7 @@ use std::{ collections::HashMap, net::{SocketAddr, ToSocketAddrs}, result::Result, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, thread, }; @@ -229,6 +229,8 @@ pub(crate) struct TcpTransport { connection_state: ConnectionStateMgr, /// Message queue for requests / responses message_queue: Arc>, + /// Tokio runtime + runtime: Arc>, } impl Drop for TcpTransport { @@ -247,25 +249,34 @@ impl TcpTransport { secure_channel: Arc>, session_state: Arc>, message_queue: Arc>, + single_threaded_executor: bool, ) -> TcpTransport { let connection_state = { let session_state = trace_read_lock_unwrap!(session_state); session_state.connection_state() }; + + let runtime = { + let mut builder = if !single_threaded_executor { + tokio::runtime::Builder::new_multi_thread() + } else { + tokio::runtime::Builder::new_current_thread() + }; + + builder.enable_all().build().unwrap() + }; + TcpTransport { session_state, secure_channel, connection_state, message_queue, + runtime: Arc::new(Mutex::new(runtime)), } } /// Connects the stream to the specified endpoint - pub fn connect( - &mut self, - runtime: &tokio::runtime::Runtime, - endpoint_url: &str, - ) -> Result<(), StatusCode> { + pub fn connect(&mut self, endpoint_url: &str) -> Result<(), StatusCode> { if self.is_connected() { panic!("Should not try to connect when already connected"); } @@ -296,34 +307,55 @@ impl TcpTransport { }; assert_eq!(addr.port(), port); - // The connection will be serviced on its own thread. When the thread terminates, the connection - // has also terminated. - self.spawn_connection_task(runtime, addr, endpoint_url); + let connection_task = { + let (connection_state, session_state, secure_channel, message_queue) = ( + self.connection_state.clone(), + self.session_state.clone(), + self.secure_channel.clone(), + self.message_queue.clone(), + ); + let endpoint_url = endpoint_url.to_string(); - debug!("Spawning task that waits on the connection state to change"); - let result = runtime.block_on(async move { - // Poll for the state to indicate connect is ready - debug!("Waiting for a connect (or failure to connect)"); - let mut timer = interval(Duration::from_millis(Self::WAIT_POLLING_TIMEOUT)); - loop { - timer.tick().await; - match self.connection_state.state() { - ConnectionState::Processing => { - debug!("Connected"); - return Ok(()); - } - ConnectionState::Finished(status_code) => { - error!("Connected failed with status {}", status_code); - return Err(StatusCode::BadConnectionClosed); - } - _ => { - // Still waiting for something to happen - } - } - } + let id = format!("client-connection-thread-{:?}", thread::current().id()); + Self::connection_task( + id, + addr, + connection_state, + endpoint_url, + session_state, + secure_channel, + message_queue, + ) + }; + + let runtime = self.runtime.clone(); + thread::spawn(move || { + debug!("Client tokio tasks are starting for connection"); + let runtime = trace_lock_unwrap!(runtime); + runtime.block_on(async move { + connection_task.await; + debug!("Client tokio tasks have stopped for connection"); + }); }); - result + // Poll for the state to indicate connect is ready + debug!("Waiting for a connect (or failure to connect)"); + loop { + match self.connection_state.state() { + ConnectionState::Processing => { + debug!("Connected"); + return Ok(()); + } + ConnectionState::Finished(status_code) => { + error!("Connected failed with status {}", status_code); + return Err(StatusCode::BadConnectionClosed); + } + _ => { + // Still waiting for something to happen + } + } + thread::sleep(Duration::from_millis(Self::WAIT_POLLING_TIMEOUT)) + } } /// Disconnects the stream from the server (if it is connected) @@ -343,34 +375,6 @@ impl TcpTransport { self.connection_state.is_connected() } - fn spawn_connection_task( - &self, - runtime: &tokio::runtime::Runtime, - addr: SocketAddr, - endpoint_url: &str, - ) { - let (connection_state, session_state, secure_channel, message_queue) = ( - self.connection_state.clone(), - self.session_state.clone(), - self.secure_channel.clone(), - self.message_queue.clone(), - ); - let endpoint_url = endpoint_url.to_string(); - - let id = format!("client-connection-thread-{:?}", thread::current().id()); - let connection_task = Self::connection_task( - id, - addr, - connection_state, - endpoint_url, - session_state, - secure_channel, - message_queue, - ); - - runtime.spawn(connection_task); - } - /// This is the main connection task for a connection. async fn connection_task( id: String, diff --git a/client/src/session.rs b/client/src/session.rs index 29ae7324e..fa69de465 100644 --- a/client/src/session.rs +++ b/client/src/session.rs @@ -159,8 +159,10 @@ pub struct Session { session_retry_policy: SessionRetryPolicy, /// Ignore clock skew between the client and the server. ignore_clock_skew: bool, + /// Single threaded executor flag (for TCP transport) + single_threaded_executor: bool, /// Tokio runtime - runtime: tokio::runtime::Runtime, + runtime: Arc>, } impl Drop for Session { @@ -216,19 +218,15 @@ impl Session { secure_channel.clone(), session_state.clone(), message_queue.clone(), + single_threaded_executor, ); let subscription_state = Arc::new(RwLock::new(SubscriptionState::new())); - let runtime = { - if !single_threaded_executor { - tokio::runtime::Builder::new_multi_thread() - } else { - tokio::runtime::Builder::new_current_thread() - } + // This runtime is single threaded. The one for the transport may be multi-threaded + let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() - .unwrap() - }; + .unwrap(); Session { application_description, @@ -242,7 +240,8 @@ impl Session { message_queue, session_retry_policy, ignore_clock_skew, - runtime, + single_threaded_executor, + runtime: Arc::new(Mutex::new(runtime)), } } @@ -265,6 +264,7 @@ impl Session { self.secure_channel.clone(), self.session_state.clone(), self.message_queue.clone(), + self.single_threaded_executor, ); } @@ -576,8 +576,7 @@ impl Session { } // Transport's tokio runtime is made here, not in transport - self.transport - .connect(&self.runtime, endpoint_url.as_ref())?; + self.transport.connect(endpoint_url.as_ref())?; self.open_secure_channel()?; self.on_connection_status_change(true); Ok(()) @@ -630,7 +629,7 @@ impl Session { /// * `session` - the session to run ynchronously /// pub fn run(session: Arc>) { - let (_tx, rx) = oneshot::channel(); + let (tx, rx) = oneshot::channel(); Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx); } @@ -713,10 +712,6 @@ impl Session { /// /// # Arguments /// - /// * `runtime` - The Tokio runtime to execute the session task on - /// * `block_on` - A flag that says if the runtime should block on the task completion or not. - /// Normally this would be `true` but an external runtime might use `false` - /// to just spawn the task. /// * `session` - The session /// * `sleep_interval` - An internal polling timer in ms /// * `rx` - A receiver that the task uses to receive a quit command directly from the caller. @@ -733,10 +728,12 @@ impl Session { } }; // Spawn the task on the alloted runtime - thread::spawn(move || { + let runtime = { let session = trace_read_lock_unwrap!(session); - session.runtime.block_on(task); - }); + session.runtime.clone() + }; + let runtime = trace_lock_unwrap!(runtime); + runtime.block_on(task); } /// Polls on the session which basically dispatches any pending @@ -1119,7 +1116,8 @@ impl Session { ); let id = format!("session-activity-thread-{:?}", thread::current().id()); - self.runtime.spawn(async move { + let runtime = trace_lock_unwrap!(self.runtime); + runtime.spawn(async move { register_runtime_component!(&id); // The timer runs at a higher frequency timer loop to terminate as soon after the session // state has terminated. Each time it runs it will test if the interval has elapsed or not.