Skip to content

Commit

Permalink
Rollback some changes
Browse files Browse the repository at this point in the history
  • Loading branch information
locka99 committed Aug 4, 2021
1 parent 1ed8caf commit c18c5f1
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 80 deletions.
122 changes: 63 additions & 59 deletions client/src/comms/tcp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
collections::HashMap,
net::{SocketAddr, ToSocketAddrs},
result::Result,
sync::{Arc, RwLock},
sync::{Arc, Mutex, RwLock},
thread,
};

Expand Down Expand Up @@ -229,6 +229,8 @@ pub(crate) struct TcpTransport {
connection_state: ConnectionStateMgr,
/// Message queue for requests / responses
message_queue: Arc<RwLock<MessageQueue>>,
/// Tokio runtime
runtime: Arc<Mutex<tokio::runtime::Runtime>>,
}

impl Drop for TcpTransport {
Expand All @@ -247,25 +249,34 @@ impl TcpTransport {
secure_channel: Arc<RwLock<SecureChannel>>,
session_state: Arc<RwLock<SessionState>>,
message_queue: Arc<RwLock<MessageQueue>>,
single_threaded_executor: bool,
) -> TcpTransport {
let connection_state = {
let session_state = trace_read_lock_unwrap!(session_state);
session_state.connection_state()
};

let runtime = {
let mut builder = if !single_threaded_executor {
tokio::runtime::Builder::new_multi_thread()
} else {
tokio::runtime::Builder::new_current_thread()
};

builder.enable_all().build().unwrap()
};

TcpTransport {
session_state,
secure_channel,
connection_state,
message_queue,
runtime: Arc::new(Mutex::new(runtime)),
}
}

/// Connects the stream to the specified endpoint
pub fn connect(
&mut self,
runtime: &tokio::runtime::Runtime,
endpoint_url: &str,
) -> Result<(), StatusCode> {
pub fn connect(&mut self, endpoint_url: &str) -> Result<(), StatusCode> {
if self.is_connected() {
panic!("Should not try to connect when already connected");
}
Expand Down Expand Up @@ -296,34 +307,55 @@ impl TcpTransport {
};
assert_eq!(addr.port(), port);

// The connection will be serviced on its own thread. When the thread terminates, the connection
// has also terminated.
self.spawn_connection_task(runtime, addr, endpoint_url);
let connection_task = {
let (connection_state, session_state, secure_channel, message_queue) = (
self.connection_state.clone(),
self.session_state.clone(),
self.secure_channel.clone(),
self.message_queue.clone(),
);
let endpoint_url = endpoint_url.to_string();

debug!("Spawning task that waits on the connection state to change");
let result = runtime.block_on(async move {
// Poll for the state to indicate connect is ready
debug!("Waiting for a connect (or failure to connect)");
let mut timer = interval(Duration::from_millis(Self::WAIT_POLLING_TIMEOUT));
loop {
timer.tick().await;
match self.connection_state.state() {
ConnectionState::Processing => {
debug!("Connected");
return Ok(());
}
ConnectionState::Finished(status_code) => {
error!("Connected failed with status {}", status_code);
return Err(StatusCode::BadConnectionClosed);
}
_ => {
// Still waiting for something to happen
}
}
}
let id = format!("client-connection-thread-{:?}", thread::current().id());
Self::connection_task(
id,
addr,
connection_state,
endpoint_url,
session_state,
secure_channel,
message_queue,
)
};

let runtime = self.runtime.clone();
thread::spawn(move || {
debug!("Client tokio tasks are starting for connection");
let runtime = trace_lock_unwrap!(runtime);
runtime.block_on(async move {
connection_task.await;
debug!("Client tokio tasks have stopped for connection");
});
});

result
// Poll for the state to indicate connect is ready
debug!("Waiting for a connect (or failure to connect)");
loop {
match self.connection_state.state() {
ConnectionState::Processing => {
debug!("Connected");
return Ok(());
}
ConnectionState::Finished(status_code) => {
error!("Connected failed with status {}", status_code);
return Err(StatusCode::BadConnectionClosed);
}
_ => {
// Still waiting for something to happen
}
}
thread::sleep(Duration::from_millis(Self::WAIT_POLLING_TIMEOUT))
}
}

/// Disconnects the stream from the server (if it is connected)
Expand All @@ -343,34 +375,6 @@ impl TcpTransport {
self.connection_state.is_connected()
}

fn spawn_connection_task(
&self,
runtime: &tokio::runtime::Runtime,
addr: SocketAddr,
endpoint_url: &str,
) {
let (connection_state, session_state, secure_channel, message_queue) = (
self.connection_state.clone(),
self.session_state.clone(),
self.secure_channel.clone(),
self.message_queue.clone(),
);
let endpoint_url = endpoint_url.to_string();

let id = format!("client-connection-thread-{:?}", thread::current().id());
let connection_task = Self::connection_task(
id,
addr,
connection_state,
endpoint_url,
session_state,
secure_channel,
message_queue,
);

runtime.spawn(connection_task);
}

/// This is the main connection task for a connection.
async fn connection_task(
id: String,
Expand Down
40 changes: 19 additions & 21 deletions client/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ pub struct Session {
session_retry_policy: SessionRetryPolicy,
/// Ignore clock skew between the client and the server.
ignore_clock_skew: bool,
/// Single threaded executor flag (for TCP transport)
single_threaded_executor: bool,
/// Tokio runtime
runtime: tokio::runtime::Runtime,
runtime: Arc<Mutex<tokio::runtime::Runtime>>,
}

impl Drop for Session {
Expand Down Expand Up @@ -216,19 +218,15 @@ impl Session {
secure_channel.clone(),
session_state.clone(),
message_queue.clone(),
single_threaded_executor,
);
let subscription_state = Arc::new(RwLock::new(SubscriptionState::new()));

let runtime = {
if !single_threaded_executor {
tokio::runtime::Builder::new_multi_thread()
} else {
tokio::runtime::Builder::new_current_thread()
}
// This runtime is single threaded. The one for the transport may be multi-threaded
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
};
.unwrap();

Session {
application_description,
Expand All @@ -242,7 +240,8 @@ impl Session {
message_queue,
session_retry_policy,
ignore_clock_skew,
runtime,
single_threaded_executor,
runtime: Arc::new(Mutex::new(runtime)),
}
}

Expand All @@ -265,6 +264,7 @@ impl Session {
self.secure_channel.clone(),
self.session_state.clone(),
self.message_queue.clone(),
self.single_threaded_executor,
);
}

Expand Down Expand Up @@ -576,8 +576,7 @@ impl Session {
}

// Transport's tokio runtime is made here, not in transport
self.transport
.connect(&self.runtime, endpoint_url.as_ref())?;
self.transport.connect(endpoint_url.as_ref())?;
self.open_secure_channel()?;
self.on_connection_status_change(true);
Ok(())
Expand Down Expand Up @@ -630,7 +629,7 @@ impl Session {
/// * `session` - the session to run ynchronously
///
pub fn run(session: Arc<RwLock<Session>>) {
let (_tx, rx) = oneshot::channel();
let (tx, rx) = oneshot::channel();
Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx);
}

Expand Down Expand Up @@ -713,10 +712,6 @@ impl Session {
///
/// # Arguments
///
/// * `runtime` - The Tokio runtime to execute the session task on
/// * `block_on` - A flag that says if the runtime should block on the task completion or not.
/// Normally this would be `true` but an external runtime might use `false`
/// to just spawn the task.
/// * `session` - The session
/// * `sleep_interval` - An internal polling timer in ms
/// * `rx` - A receiver that the task uses to receive a quit command directly from the caller.
Expand All @@ -733,10 +728,12 @@ impl Session {
}
};
// Spawn the task on the alloted runtime
thread::spawn(move || {
let runtime = {
let session = trace_read_lock_unwrap!(session);
session.runtime.block_on(task);
});
session.runtime.clone()
};
let runtime = trace_lock_unwrap!(runtime);
runtime.block_on(task);
}

/// Polls on the session which basically dispatches any pending
Expand Down Expand Up @@ -1119,7 +1116,8 @@ impl Session {
);

let id = format!("session-activity-thread-{:?}", thread::current().id());
self.runtime.spawn(async move {
let runtime = trace_lock_unwrap!(self.runtime);
runtime.spawn(async move {
register_runtime_component!(&id);
// The timer runs at a higher frequency timer loop to terminate as soon after the session
// state has terminated. Each time it runs it will test if the interval has elapsed or not.
Expand Down

0 comments on commit c18c5f1

Please sign in to comment.