diff --git a/lib/src/client/session/session.rs b/lib/src/client/session/session.rs index eb1cb525a..256e7b2ab 100644 --- a/lib/src/client/session/session.rs +++ b/lib/src/client/session/session.rs @@ -187,19 +187,24 @@ impl Session { Role::Client, decoding_options, ))); + let message_queue = Arc::new(RwLock::new(MessageQueue::new())); + + let subscription_state = Arc::new(RwLock::new(SubscriptionState::new())); + let session_state = Arc::new(RwLock::new(SessionState::new( ignore_clock_skew, secure_channel.clone(), + subscription_state.clone(), message_queue.clone(), ))); + let transport = TcpTransport::new( secure_channel.clone(), session_state.clone(), message_queue.clone(), single_threaded_executor, ); - let subscription_state = Arc::new(RwLock::new(SubscriptionState::new())); // This runtime is single threaded. The one for the transport may be multi-threaded let runtime = tokio::runtime::Builder::new_current_thread() @@ -235,6 +240,7 @@ impl Session { self.session_state = Arc::new(RwLock::new(SessionState::new( self.ignore_clock_skew, self.secure_channel.clone(), + self.subscription_state.clone(), self.message_queue.clone(), ))); @@ -857,6 +863,69 @@ impl Session { }); } + /// Start a task that will periodically send a publish request to keep the subscriptions alive. + /// The request rate will be 3/4 of the shortest (revised publishing interval * the revised keep + /// alive count) of all subscriptions that belong to a single session. + fn spawn_subscription_activity_task(&self) { + session_debug!(self, "spawn_subscription_activity_task",); + + let connection_state = { + let session_state = trace_read_lock!(self.session_state); + session_state.connection_state() + }; + + const MIN_SUBSCRIPTION_ACTIVITY_MS: u64 = 1000; + let session_state = self.session_state.clone(); + let subscription_state = self.subscription_state.clone(); + + let id = format!("subscription-activity-thread-{:?}", thread::current().id()); + let runtime = trace_lock!(self.runtime); + runtime.spawn(async move { + register_runtime_component!(&id); + + // The timer runs at a higher frequency timer loop to terminate as soon after the session + // state has terminated. Each time it runs it will test if the interval has elapsed or not. + let mut timer = interval(Duration::from_millis(MIN_SUBSCRIPTION_ACTIVITY_MS)); + + let mut last_timeout: Instant; + let mut subscription_activity_interval: Duration; + + loop { + timer.tick().await; + + if connection_state.is_finished() { + info!("Session activity timer is terminating"); + break; + } + + if let (Some(keep_alive_timeout), last_publish_request) = { + let subscription_state = trace_read_lock!(subscription_state); + ( + subscription_state.keep_alive_timeout(), + subscription_state.last_publish_request(), + ) + } { + subscription_activity_interval = + Duration::from_millis((keep_alive_timeout / 4) * 3); + last_timeout = last_publish_request; + + // Get the time now + let now = Instant::now(); + + // Calculate to interval since last check + let interval = now - last_timeout; + if interval > subscription_activity_interval { + let mut session_state = trace_write_lock!(session_state); + let _ = session_state.async_publish(); + } + } + } + + info!("Subscription activity timer task is finished"); + deregister_runtime_component!(&id); + }); + } + /// This is the internal handler for create subscription that receives the callback wrapped up and reference counted. fn create_subscription_inner( &self, @@ -1254,7 +1323,10 @@ impl Session { // Turn off publish requests until server says otherwise debug!("Server tells us too many publish requests so waiting for a response before resuming"); } - StatusCode::BadSessionClosed | StatusCode::BadSessionIdInvalid => { + StatusCode::BadSessionClosed + | StatusCode::BadSessionIdInvalid + | StatusCode::BadNoSubscription + | StatusCode::BadSubscriptionIdInvalid => { let mut session_state = trace_write_lock!(self.session_state); session_state.on_session_closed(service_result) } @@ -1446,6 +1518,13 @@ impl SessionService for Session { let _ = secure_channel .set_remote_cert_from_byte_string(&response.server_certificate); } + // When ignoring clock skew, we calculate the time offset between the client + // and the server and use that to compensate for the difference in time. + if self.ignore_clock_skew && !response.response_header.timestamp.is_null() { + let offset = response.response_header.timestamp - DateTime::now(); + // Update the client offset by adding the new offset. + session_state.set_client_offset(offset); + } session_state.session_id() }; @@ -1496,6 +1575,7 @@ impl SessionService for Session { response.revised_session_timeout ); self.spawn_session_activity_task(response.revised_session_timeout); + self.spawn_subscription_activity_task(); // TODO Verify signature using server's public key (from endpoint) comparing with data made from client certificate and nonce. // crypto::verify_signature_data(verification_key, security_policy, server_certificate, client_certificate, client_nonce); diff --git a/lib/src/client/session/session_state.rs b/lib/src/client/session/session_state.rs index 00aebcb94..bff0cc0bb 100644 --- a/lib/src/client/session/session_state.rs +++ b/lib/src/client/session/session_state.rs @@ -13,6 +13,8 @@ use std::{ use chrono::Duration; +use tokio::time::Instant; + use crate::core::{ comms::secure_channel::SecureChannel, handle::Handle, supported_message::SupportedMessage, }; @@ -23,6 +25,7 @@ use crate::client::{ callbacks::{OnConnectionStatusChange, OnSessionClosed}, message_queue::MessageQueue, process_unexpected_response, + subscription_state::SubscriptionState, }; #[derive(Copy, Clone, PartialEq, Debug)] @@ -129,6 +132,8 @@ pub(crate) struct SessionState { monitored_item_handle: Handle, /// Subscription acknowledgements pending for send subscription_acknowledgements: Vec, + /// Subscription state + subscription_state: Arc>, /// The message queue message_queue: Arc>, /// Connection closed callback @@ -165,6 +170,7 @@ impl SessionState { pub fn new( ignore_clock_skew: bool, secure_channel: Arc>, + subscription_state: Arc>, message_queue: Arc>, ) -> SessionState { let id = NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed); @@ -183,8 +189,9 @@ impl SessionState { session_id: NodeId::null(), authentication_token: NodeId::null(), monitored_item_handle: Handle::new(Self::FIRST_MONITORED_ITEM_HANDLE), - message_queue, subscription_acknowledgements: Vec::new(), + subscription_state, + message_queue, session_closed_callback: None, connection_status_callback: None, } @@ -194,6 +201,11 @@ impl SessionState { self.id } + pub fn set_client_offset(&mut self, offset: Duration) { + self.client_offset = self.client_offset + offset; + debug!("Client offset set to {}", self.client_offset); + } + pub fn set_session_id(&mut self, session_id: NodeId) { self.session_id = session_id } @@ -297,6 +309,12 @@ impl SessionState { subscription_acknowledgements, }; let request_handle = self.async_send_request(request, None)?; + + { + let mut subscription_state = trace_write_lock!(self.subscription_state); + subscription_state.set_last_publish_request(Instant::now()); + } + debug!("async_publish, request sent with handle {}", request_handle); Ok(request_handle) } @@ -453,15 +471,14 @@ impl SessionState { // server and use that offset to compensate for the difference in time when setting // the timestamps in the request headers and when decoding timestamps in messages // received from the server. - if self.ignore_clock_skew { + if self.ignore_clock_skew && !response.response_header.timestamp.is_null() { let offset = response.response_header.timestamp - DateTime::now(); // Make sure to apply the offset to the security token in the current response. security_token.created_at = security_token.created_at - offset; // Update the client offset by adding the new offset. When the secure channel is // renewed its already using the client offset calculated when issuing the secure // channel and only needs to be updated to accommodate any additional clock skew. - self.client_offset = self.client_offset + offset; - debug!("Client offset set to {}", self.client_offset); + self.set_client_offset(offset); } debug!("Setting transport's security token"); diff --git a/lib/src/client/subscription_state.rs b/lib/src/client/subscription_state.rs index 07e91399a..d9e777e38 100644 --- a/lib/src/client/subscription_state.rs +++ b/lib/src/client/subscription_state.rs @@ -4,12 +4,18 @@ use std::collections::HashMap; +use tokio::time::Instant; + use crate::types::service_types::{DataChangeNotification, EventNotificationList}; use super::subscription::*; /// Holds the live subscription state pub struct SubscriptionState { + /// Subscripion keep alive timeout + keep_alive_timeout: Option, + /// Timestamp of last pushish request + last_publish_request: Instant, /// Subscriptions (key = subscription_id) subscriptions: HashMap, } @@ -17,6 +23,8 @@ pub struct SubscriptionState { impl SubscriptionState { pub fn new() -> SubscriptionState { SubscriptionState { + keep_alive_timeout: None, + last_publish_request: Instant::now(), subscriptions: HashMap::new(), } } @@ -40,6 +48,7 @@ impl SubscriptionState { pub(crate) fn add_subscription(&mut self, subscription: Subscription) { self.subscriptions .insert(subscription.subscription_id(), subscription); + self.set_keep_alive_timeout(); } pub(crate) fn modify_subscription( @@ -57,11 +66,14 @@ impl SubscriptionState { subscription.set_max_keep_alive_count(max_keep_alive_count); subscription.set_max_notifications_per_publish(max_notifications_per_publish); subscription.set_priority(priority); + self.set_keep_alive_timeout(); } } pub(crate) fn delete_subscription(&mut self, subscription_id: u32) -> Option { - self.subscriptions.remove(&subscription_id) + let subscription = self.subscriptions.remove(&subscription_id); + self.set_keep_alive_timeout(); + subscription } pub(crate) fn set_publishing_mode( @@ -129,4 +141,24 @@ impl SubscriptionState { subscription.set_triggering(triggering_item_id, links_to_add, links_to_remove); } } + + pub(crate) fn last_publish_request(&self) -> Instant { + self.last_publish_request + } + + pub(crate) fn set_last_publish_request(&mut self, now: Instant) { + self.last_publish_request = now; + } + + pub(crate) fn keep_alive_timeout(&self) -> Option { + self.keep_alive_timeout + } + + fn set_keep_alive_timeout(&mut self) { + self.keep_alive_timeout = self + .subscriptions + .values() + .map(|v| (v.publishing_interval() * v.lifetime_count() as f64).floor() as u64) + .min() + } } diff --git a/lib/src/core/comms/secure_channel.rs b/lib/src/core/comms/secure_channel.rs index df636cbac..23e2673e5 100644 --- a/lib/src/core/comms/secure_channel.rs +++ b/lib/src/core/comms/secure_channel.rs @@ -272,16 +272,9 @@ impl SecureChannel { /// Creates a nonce for the connection. The nonce should be the same size as the symmetric key pub fn create_random_nonce(&mut self) { - if self.security_policy != SecurityPolicy::None - && (self.security_mode == MessageSecurityMode::Sign - || self.security_mode == MessageSecurityMode::SignAndEncrypt) - { - self.local_nonce = vec![0u8; self.security_policy.secure_channel_nonce_length()]; - random::bytes(&mut self.local_nonce); - } else { - // Empty nonce - self.local_nonce = Vec::new(); - } + self.local_nonce + .resize(self.security_policy.secure_channel_nonce_length(), 0); + random::bytes(&mut self.local_nonce); } /// Sets the remote certificate @@ -1031,7 +1024,7 @@ impl SecureChannel { pub fn local_nonce_as_byte_string(&self) -> ByteString { if self.local_nonce.is_empty() { - ByteString::from(&[0u8; 32]) + ByteString::null() } else { ByteString::from(&self.local_nonce) } diff --git a/lib/src/crypto/security_policy.rs b/lib/src/crypto/security_policy.rs index 577632e76..93954544f 100644 --- a/lib/src/crypto/security_policy.rs +++ b/lib/src/crypto/security_policy.rs @@ -395,7 +395,10 @@ impl SecurityPolicy { | SecurityPolicy::Basic256Sha256 | SecurityPolicy::Aes128Sha256RsaOaep | SecurityPolicy::Aes256Sha256RsaPss => 32, - _ => panic!(""), + // The nonce can be used for password or X509 authentication + // even when the security policy is None. + // see https://github.com/advisories/GHSA-pq4w-qm9g-qx68 + SecurityPolicy::None | SecurityPolicy::Unknown => 32, } } diff --git a/tools/schema/npm-shrinkwrap.json b/tools/schema/npm-shrinkwrap.json index 123440ccf..fc189a412 100644 --- a/tools/schema/npm-shrinkwrap.json +++ b/tools/schema/npm-shrinkwrap.json @@ -199,9 +199,9 @@ "dev": true }, "node_modules/minimist": { - "version": "1.2.5", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.5.tgz", - "integrity": "sha512-FM9nNUYrRBAELZQT3xeZQ7fmMOBg6nWNmJKTcgsJeaLstP/UODVpGsr5OhXhhXg6f+qtJ8uiZ+PUxkDWcgIXLw==", + "version": "1.2.6", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.6.tgz", + "integrity": "sha512-Jsjnk4bw3YJqYzbdyBiNsPWHPfO++UGG749Cxs6peCu5Xg4nrena6OVxOYxrQTqww0Jmwt+Ref8rggumkTLz9Q==", "dev": true }, "node_modules/ndjson": { @@ -567,9 +567,9 @@ "dev": true }, "minimist": { - "version": "1.2.5", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.5.tgz", - "integrity": "sha512-FM9nNUYrRBAELZQT3xeZQ7fmMOBg6nWNmJKTcgsJeaLstP/UODVpGsr5OhXhhXg6f+qtJ8uiZ+PUxkDWcgIXLw==", + "version": "1.2.6", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.6.tgz", + "integrity": "sha512-Jsjnk4bw3YJqYzbdyBiNsPWHPfO++UGG749Cxs6peCu5Xg4nrena6OVxOYxrQTqww0Jmwt+Ref8rggumkTLz9Q==", "dev": true }, "ndjson": {