Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into single-crate
Browse files Browse the repository at this point in the history
# Conflicts:
#	lib/src/client/session/session_state.rs
#	lib/src/client/subscription_state.rs
#	lib/src/core/comms/secure_channel.rs
  • Loading branch information
locka99 committed Apr 12, 2022
2 parents 5a267b4 + 3455b65 commit 668f0f8
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 25 deletions.
84 changes: 82 additions & 2 deletions lib/src/client/session/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,24 @@ impl Session {
Role::Client,
decoding_options,
)));

let message_queue = Arc::new(RwLock::new(MessageQueue::new()));

let subscription_state = Arc::new(RwLock::new(SubscriptionState::new()));

let session_state = Arc::new(RwLock::new(SessionState::new(
ignore_clock_skew,
secure_channel.clone(),
subscription_state.clone(),
message_queue.clone(),
)));

let transport = TcpTransport::new(
secure_channel.clone(),
session_state.clone(),
message_queue.clone(),
single_threaded_executor,
);
let subscription_state = Arc::new(RwLock::new(SubscriptionState::new()));

// This runtime is single threaded. The one for the transport may be multi-threaded
let runtime = tokio::runtime::Builder::new_current_thread()
Expand Down Expand Up @@ -235,6 +240,7 @@ impl Session {
self.session_state = Arc::new(RwLock::new(SessionState::new(
self.ignore_clock_skew,
self.secure_channel.clone(),
self.subscription_state.clone(),
self.message_queue.clone(),
)));

Expand Down Expand Up @@ -857,6 +863,69 @@ impl Session {
});
}

/// Start a task that will periodically send a publish request to keep the subscriptions alive.
/// The request rate will be 3/4 of the shortest (revised publishing interval * the revised keep
/// alive count) of all subscriptions that belong to a single session.
fn spawn_subscription_activity_task(&self) {
session_debug!(self, "spawn_subscription_activity_task",);

let connection_state = {
let session_state = trace_read_lock!(self.session_state);
session_state.connection_state()
};

const MIN_SUBSCRIPTION_ACTIVITY_MS: u64 = 1000;
let session_state = self.session_state.clone();
let subscription_state = self.subscription_state.clone();

let id = format!("subscription-activity-thread-{:?}", thread::current().id());
let runtime = trace_lock!(self.runtime);
runtime.spawn(async move {
register_runtime_component!(&id);

// The timer runs at a higher frequency timer loop to terminate as soon after the session
// state has terminated. Each time it runs it will test if the interval has elapsed or not.
let mut timer = interval(Duration::from_millis(MIN_SUBSCRIPTION_ACTIVITY_MS));

let mut last_timeout: Instant;
let mut subscription_activity_interval: Duration;

loop {
timer.tick().await;

if connection_state.is_finished() {
info!("Session activity timer is terminating");
break;
}

if let (Some(keep_alive_timeout), last_publish_request) = {
let subscription_state = trace_read_lock!(subscription_state);
(
subscription_state.keep_alive_timeout(),
subscription_state.last_publish_request(),
)
} {
subscription_activity_interval =
Duration::from_millis((keep_alive_timeout / 4) * 3);
last_timeout = last_publish_request;

// Get the time now
let now = Instant::now();

// Calculate to interval since last check
let interval = now - last_timeout;
if interval > subscription_activity_interval {
let mut session_state = trace_write_lock!(session_state);
let _ = session_state.async_publish();
}
}
}

info!("Subscription activity timer task is finished");
deregister_runtime_component!(&id);
});
}

/// This is the internal handler for create subscription that receives the callback wrapped up and reference counted.
fn create_subscription_inner(
&self,
Expand Down Expand Up @@ -1254,7 +1323,10 @@ impl Session {
// Turn off publish requests until server says otherwise
debug!("Server tells us too many publish requests so waiting for a response before resuming");
}
StatusCode::BadSessionClosed | StatusCode::BadSessionIdInvalid => {
StatusCode::BadSessionClosed
| StatusCode::BadSessionIdInvalid
| StatusCode::BadNoSubscription
| StatusCode::BadSubscriptionIdInvalid => {
let mut session_state = trace_write_lock!(self.session_state);
session_state.on_session_closed(service_result)
}
Expand Down Expand Up @@ -1446,6 +1518,13 @@ impl SessionService for Session {
let _ = secure_channel
.set_remote_cert_from_byte_string(&response.server_certificate);
}
// When ignoring clock skew, we calculate the time offset between the client
// and the server and use that to compensate for the difference in time.
if self.ignore_clock_skew && !response.response_header.timestamp.is_null() {
let offset = response.response_header.timestamp - DateTime::now();
// Update the client offset by adding the new offset.
session_state.set_client_offset(offset);
}
session_state.session_id()
};

Expand Down Expand Up @@ -1496,6 +1575,7 @@ impl SessionService for Session {
response.revised_session_timeout
);
self.spawn_session_activity_task(response.revised_session_timeout);
self.spawn_subscription_activity_task();

// TODO Verify signature using server's public key (from endpoint) comparing with data made from client certificate and nonce.
// crypto::verify_signature_data(verification_key, security_policy, server_certificate, client_certificate, client_nonce);
Expand Down
25 changes: 21 additions & 4 deletions lib/src/client/session/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use std::{

use chrono::Duration;

use tokio::time::Instant;

use crate::core::{
comms::secure_channel::SecureChannel, handle::Handle, supported_message::SupportedMessage,
};
Expand All @@ -23,6 +25,7 @@ use crate::client::{
callbacks::{OnConnectionStatusChange, OnSessionClosed},
message_queue::MessageQueue,
process_unexpected_response,
subscription_state::SubscriptionState,
};

#[derive(Copy, Clone, PartialEq, Debug)]
Expand Down Expand Up @@ -129,6 +132,8 @@ pub(crate) struct SessionState {
monitored_item_handle: Handle,
/// Subscription acknowledgements pending for send
subscription_acknowledgements: Vec<SubscriptionAcknowledgement>,
/// Subscription state
subscription_state: Arc<RwLock<SubscriptionState>>,
/// The message queue
message_queue: Arc<RwLock<MessageQueue>>,
/// Connection closed callback
Expand Down Expand Up @@ -165,6 +170,7 @@ impl SessionState {
pub fn new(
ignore_clock_skew: bool,
secure_channel: Arc<RwLock<SecureChannel>>,
subscription_state: Arc<RwLock<SubscriptionState>>,
message_queue: Arc<RwLock<MessageQueue>>,
) -> SessionState {
let id = NEXT_SESSION_ID.fetch_add(1, Ordering::Relaxed);
Expand All @@ -183,8 +189,9 @@ impl SessionState {
session_id: NodeId::null(),
authentication_token: NodeId::null(),
monitored_item_handle: Handle::new(Self::FIRST_MONITORED_ITEM_HANDLE),
message_queue,
subscription_acknowledgements: Vec::new(),
subscription_state,
message_queue,
session_closed_callback: None,
connection_status_callback: None,
}
Expand All @@ -194,6 +201,11 @@ impl SessionState {
self.id
}

pub fn set_client_offset(&mut self, offset: Duration) {
self.client_offset = self.client_offset + offset;
debug!("Client offset set to {}", self.client_offset);
}

pub fn set_session_id(&mut self, session_id: NodeId) {
self.session_id = session_id
}
Expand Down Expand Up @@ -297,6 +309,12 @@ impl SessionState {
subscription_acknowledgements,
};
let request_handle = self.async_send_request(request, None)?;

{
let mut subscription_state = trace_write_lock!(self.subscription_state);
subscription_state.set_last_publish_request(Instant::now());
}

debug!("async_publish, request sent with handle {}", request_handle);
Ok(request_handle)
}
Expand Down Expand Up @@ -453,15 +471,14 @@ impl SessionState {
// server and use that offset to compensate for the difference in time when setting
// the timestamps in the request headers and when decoding timestamps in messages
// received from the server.
if self.ignore_clock_skew {
if self.ignore_clock_skew && !response.response_header.timestamp.is_null() {
let offset = response.response_header.timestamp - DateTime::now();
// Make sure to apply the offset to the security token in the current response.
security_token.created_at = security_token.created_at - offset;
// Update the client offset by adding the new offset. When the secure channel is
// renewed its already using the client offset calculated when issuing the secure
// channel and only needs to be updated to accommodate any additional clock skew.
self.client_offset = self.client_offset + offset;
debug!("Client offset set to {}", self.client_offset);
self.set_client_offset(offset);
}

debug!("Setting transport's security token");
Expand Down
34 changes: 33 additions & 1 deletion lib/src/client/subscription_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,27 @@

use std::collections::HashMap;

use tokio::time::Instant;

use crate::types::service_types::{DataChangeNotification, EventNotificationList};

use super::subscription::*;

/// Holds the live subscription state
pub struct SubscriptionState {
/// Subscripion keep alive timeout
keep_alive_timeout: Option<u64>,
/// Timestamp of last pushish request
last_publish_request: Instant,
/// Subscriptions (key = subscription_id)
subscriptions: HashMap<u32, Subscription>,
}

impl SubscriptionState {
pub fn new() -> SubscriptionState {
SubscriptionState {
keep_alive_timeout: None,
last_publish_request: Instant::now(),
subscriptions: HashMap::new(),
}
}
Expand All @@ -40,6 +48,7 @@ impl SubscriptionState {
pub(crate) fn add_subscription(&mut self, subscription: Subscription) {
self.subscriptions
.insert(subscription.subscription_id(), subscription);
self.set_keep_alive_timeout();
}

pub(crate) fn modify_subscription(
Expand All @@ -57,11 +66,14 @@ impl SubscriptionState {
subscription.set_max_keep_alive_count(max_keep_alive_count);
subscription.set_max_notifications_per_publish(max_notifications_per_publish);
subscription.set_priority(priority);
self.set_keep_alive_timeout();
}
}

pub(crate) fn delete_subscription(&mut self, subscription_id: u32) -> Option<Subscription> {
self.subscriptions.remove(&subscription_id)
let subscription = self.subscriptions.remove(&subscription_id);
self.set_keep_alive_timeout();
subscription
}

pub(crate) fn set_publishing_mode(
Expand Down Expand Up @@ -129,4 +141,24 @@ impl SubscriptionState {
subscription.set_triggering(triggering_item_id, links_to_add, links_to_remove);
}
}

pub(crate) fn last_publish_request(&self) -> Instant {
self.last_publish_request
}

pub(crate) fn set_last_publish_request(&mut self, now: Instant) {
self.last_publish_request = now;
}

pub(crate) fn keep_alive_timeout(&self) -> Option<u64> {
self.keep_alive_timeout
}

fn set_keep_alive_timeout(&mut self) {
self.keep_alive_timeout = self
.subscriptions
.values()
.map(|v| (v.publishing_interval() * v.lifetime_count() as f64).floor() as u64)
.min()
}
}
15 changes: 4 additions & 11 deletions lib/src/core/comms/secure_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,9 @@ impl SecureChannel {

/// Creates a nonce for the connection. The nonce should be the same size as the symmetric key
pub fn create_random_nonce(&mut self) {
if self.security_policy != SecurityPolicy::None
&& (self.security_mode == MessageSecurityMode::Sign
|| self.security_mode == MessageSecurityMode::SignAndEncrypt)
{
self.local_nonce = vec![0u8; self.security_policy.secure_channel_nonce_length()];
random::bytes(&mut self.local_nonce);
} else {
// Empty nonce
self.local_nonce = Vec::new();
}
self.local_nonce
.resize(self.security_policy.secure_channel_nonce_length(), 0);
random::bytes(&mut self.local_nonce);
}

/// Sets the remote certificate
Expand Down Expand Up @@ -1031,7 +1024,7 @@ impl SecureChannel {

pub fn local_nonce_as_byte_string(&self) -> ByteString {
if self.local_nonce.is_empty() {
ByteString::from(&[0u8; 32])
ByteString::null()
} else {
ByteString::from(&self.local_nonce)
}
Expand Down
5 changes: 4 additions & 1 deletion lib/src/crypto/security_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,10 @@ impl SecurityPolicy {
| SecurityPolicy::Basic256Sha256
| SecurityPolicy::Aes128Sha256RsaOaep
| SecurityPolicy::Aes256Sha256RsaPss => 32,
_ => panic!(""),
// The nonce can be used for password or X509 authentication
// even when the security policy is None.
// see https://github.com/advisories/GHSA-pq4w-qm9g-qx68
SecurityPolicy::None | SecurityPolicy::Unknown => 32,
}
}

Expand Down
12 changes: 6 additions & 6 deletions tools/schema/npm-shrinkwrap.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 668f0f8

Please sign in to comment.