Skip to content

Commit

Permalink
Fix a deadlock in GetMonitoredItems method call. Compliance improveme…
Browse files Browse the repository at this point in the history
…nts.
  • Loading branch information
locka99 committed Oct 28, 2021
1 parent c458d4e commit 9036764
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 79 deletions.
13 changes: 5 additions & 8 deletions samples/demo-server/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
use std::sync::{Arc, RwLock};

use opcua_server::{
address_space::method::MethodBuilder,
callbacks,
prelude::*,
session::{Session, SessionManager},
address_space::method::MethodBuilder, callbacks, prelude::*, session::SessionManager,
};

pub fn add_methods(server: &mut Server, ns: u16) {
Expand Down Expand Up @@ -64,7 +61,7 @@ struct NoOp;
impl callbacks::Method for NoOp {
fn call(
&mut self,
_session: &mut Session,
_session_id: &NodeId,
_session_map: Arc<RwLock<SessionManager>>,
_request: &CallMethodRequest,
) -> Result<CallMethodResult, StatusCode> {
Expand All @@ -83,7 +80,7 @@ struct Boop;
impl callbacks::Method for Boop {
fn call(
&mut self,
_session: &mut Session,
_session_id: &NodeId,
_session_map: Arc<RwLock<SessionManager>>,
request: &CallMethodRequest,
) -> Result<CallMethodResult, StatusCode> {
Expand Down Expand Up @@ -126,7 +123,7 @@ struct HelloWorld;
impl callbacks::Method for HelloWorld {
fn call(
&mut self,
_session: &mut Session,
_session_id: &NodeId,
_session_map: Arc<RwLock<SessionManager>>,
_request: &CallMethodRequest,
) -> Result<CallMethodResult, StatusCode> {
Expand All @@ -146,7 +143,7 @@ struct HelloX;
impl callbacks::Method for HelloX {
fn call(
&mut self,
_session: &mut Session,
_session_id: &NodeId,
_session_map: Arc<RwLock<SessionManager>>,
request: &CallMethodRequest,
) -> Result<CallMethodResult, StatusCode> {
Expand Down
5 changes: 2 additions & 3 deletions server/src/address_space/address_space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::{
callbacks, constants,
diagnostics::ServerDiagnostics,
historical::HistoryServerCapabilities,
session::Session,
state::ServerState,
};

Expand Down Expand Up @@ -1153,7 +1152,7 @@ impl AddressSpace {
pub fn call_method(
&mut self,
_server_state: &ServerState,
session: &mut Session,
session_id: &NodeId,
session_manager: Arc<RwLock<SessionManager>>,
request: &CallMethodRequest,
) -> Result<CallMethodResult, StatusCode> {
Expand All @@ -1180,7 +1179,7 @@ impl AddressSpace {
} else if let Some(method) = self.find_mut(method_id) {
// TODO check security - session / user may not have permission to call methods
match method {
NodeType::Method(method) => method.call(session, session_manager, request),
NodeType::Method(method) => method.call(session_id, session_manager, request),
_ => Err(StatusCode::BadMethodInvalid),
}
} else {
Expand Down
6 changes: 3 additions & 3 deletions server/src/address_space/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
node::{Node, NodeBase},
variable::VariableBuilder,
},
session::{Session, SessionManager},
session::SessionManager,
};

node_builder_impl!(MethodBuilder, Method);
Expand Down Expand Up @@ -237,13 +237,13 @@ impl Method {

pub fn call(
&mut self,
session: &mut Session,
session_id: &NodeId,
session_manager: Arc<RwLock<SessionManager>>,
request: &CallMethodRequest,
) -> Result<CallMethodResult, StatusCode> {
if let Some(ref mut callback) = self.callback {
// Call the handler
callback.call(session, session_manager, request)
callback.call(session_id, session_manager, request)
} else {
error!(
"Method call to {} has no handler, treating as invalid",
Expand Down
96 changes: 59 additions & 37 deletions server/src/address_space/method_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use opcua_types::service_types::{CallMethodRequest, CallMethodResult};
use opcua_types::status_code::StatusCode;
use opcua_types::*;

use crate::{
callbacks::Method,
session::{Session, SessionManager},
};
use crate::{callbacks::Method, session::SessionManager};

/// Count the number of provided input arguments, comparing them to the expected number.
fn ensure_input_argument_count(
Expand All @@ -23,13 +20,16 @@ fn ensure_input_argument_count(
if actual == expected {
Ok(())
} else if actual < expected {
debug!("Method call fails BadArgumentsMissing");
Err(StatusCode::BadArgumentsMissing)
} else {
debug!("Method call fails BadTooManyArguments");
Err(StatusCode::BadTooManyArguments)
}
} else if expected == 0 {
Ok(())
} else {
debug!("Method call fails BadArgumentsMissing");
Err(StatusCode::BadArgumentsMissing)
}
}
Expand All @@ -51,15 +51,15 @@ macro_rules! get_input_argument {

/// Search all sessions in the session map except the specified one for a matching subscription id
fn subscription_exists_on_other_session(
session: &mut Session,
this_session_id: &NodeId,
session_manager: Arc<RwLock<SessionManager>>,
subscription_id: u32,
) -> bool {
// Check if the subscription exists on another session
let session_manager = trace_read_lock_unwrap!(session_manager);
session_manager.sessions.iter().any(|(_, s)| {
let s = trace_read_lock_unwrap!(s);
s.session_id() != session.session_id() && session.subscriptions().contains(subscription_id)
s.session_id() != this_session_id && s.subscriptions().contains(subscription_id)
})
}

Expand All @@ -69,7 +69,7 @@ pub struct ServerResendDataMethod;
impl Method for ServerResendDataMethod {
fn call(
&mut self,
session: &mut Session,
session_id: &NodeId,
session_manager: Arc<RwLock<SessionManager>>,
request: &CallMethodRequest,
) -> Result<CallMethodResult, StatusCode> {
Expand All @@ -88,15 +88,25 @@ impl Method for ServerResendDataMethod {

let subscription_id = get_input_argument!(request, 0, UInt32)?;

if let Some(subscription) = session.subscriptions_mut().get_mut(*subscription_id) {
subscription.set_resend_data();
Ok(CallMethodResult {
status_code: StatusCode::Good,
input_argument_results: Some(vec![StatusCode::Good]),
input_argument_diagnostic_infos: None,
output_arguments: None,
})
} else if subscription_exists_on_other_session(session, session_manager, *subscription_id) {
{
let session_manager = trace_read_lock_unwrap!(session_manager);
if let Some(session) = session_manager.find_session_by_id(session_id) {
let mut session = trace_write_lock_unwrap!(session);
if let Some(subscription) = session.subscriptions_mut().get_mut(*subscription_id) {
subscription.set_resend_data();
return Ok(CallMethodResult {
status_code: StatusCode::Good,
input_argument_results: Some(vec![StatusCode::Good]),
input_argument_diagnostic_infos: None,
output_arguments: None,
});
};
} else {
return Err(StatusCode::BadSessionIdInvalid);
}
}

if subscription_exists_on_other_session(session_id, session_manager, *subscription_id) {
Err(StatusCode::BadUserAccessDenied)
} else {
Err(StatusCode::BadSubscriptionIdInvalid)
Expand All @@ -110,7 +120,7 @@ pub struct ServerGetMonitoredItemsMethod;
impl Method for ServerGetMonitoredItemsMethod {
fn call(
&mut self,
session: &mut Session,
session_id: &NodeId,
session_manager: Arc<RwLock<SessionManager>>,
request: &CallMethodRequest,
) -> Result<CallMethodResult, StatusCode> {
Expand All @@ -131,29 +141,41 @@ impl Method for ServerGetMonitoredItemsMethod {

let subscription_id = get_input_argument!(request, 0, UInt32)?;

if let Some(subscription) = session
.subscriptions()
.subscriptions()
.get(&subscription_id)
// Check for subscription on the session supplied
{
// Response
// serverHandles: Vec<u32>
// clientHandles: Vec<u32>
let (server_handles, client_handles) = subscription.get_handles();

let server_handles = Variant::from(server_handles);
let client_handles = Variant::from(client_handles);
let output_arguments = vec![server_handles, client_handles];

Ok(CallMethodResult {
status_code: StatusCode::Good,
input_argument_results: Some(vec![StatusCode::Good]),
input_argument_diagnostic_infos: None,
output_arguments: Some(output_arguments),
})
} else if subscription_exists_on_other_session(session, session_manager, *subscription_id) {
let session_manager = trace_read_lock_unwrap!(session_manager);
if let Some(session) = session_manager.find_session_by_id(session_id) {
let session = trace_read_lock_unwrap!(session);
if let Some(subscription) = session
.subscriptions()
.subscriptions()
.get(&subscription_id)
{
// Response
// serverHandles: Vec<u32>
// clientHandles: Vec<u32>
let (server_handles, client_handles) = subscription.get_handles();

let server_handles = Variant::from(server_handles);
let client_handles = Variant::from(client_handles);
let output_arguments = vec![server_handles, client_handles];

return Ok(CallMethodResult {
status_code: StatusCode::Good,
input_argument_results: Some(vec![StatusCode::Good]),
input_argument_diagnostic_infos: None,
output_arguments: Some(output_arguments),
});
};
} else {
return Err(StatusCode::BadSessionIdInvalid);
}
}
if subscription_exists_on_other_session(session_id, session_manager, *subscription_id) {
debug!("Method handler for GetMonitoredItems returns BadUserAccessDenied");
Err(StatusCode::BadUserAccessDenied)
} else {
debug!("Method handler for GetMonitoredItems returns BadSubscriptionIdInvalid");
Err(StatusCode::BadSubscriptionIdInvalid)
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub trait Method {
/// be invoked to handle the call.
fn call(
&mut self,
session: &mut Session,
session_id: &NodeId,
session_manager: Arc<RwLock<SessionManager>>,
request: &CallMethodRequest,
) -> Result<CallMethodResult, StatusCode>;
Expand Down
10 changes: 7 additions & 3 deletions server/src/services/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,13 @@ impl MessageHandler {
// Method Service Set, OPC UA Part 4, Section 5.11
SupportedMessage::CallRequest(request) => {
self.validate_service_request(message, CALL_COUNT, |session, session_manager| {
let session_id = {
let session = trace_read_lock_unwrap!(session);
session.session_id().clone()
};
Some(self.method_service.call(
server_state,
session,
&session_id,
session_manager,
address_space,
request,
Expand Down Expand Up @@ -510,7 +514,7 @@ impl MessageHandler {
// Look up the session from a map to see if it exists
let session = {
let session_manager = trace_write_lock_unwrap!(self.session_manager);
session_manager.find_session(&request_header.authentication_token)
session_manager.find_session_by_token(&request_header.authentication_token)
};
if let Some(session) = session {
let (response, authorized) = if let Err(response) =
Expand Down Expand Up @@ -546,7 +550,7 @@ impl MessageHandler {
let session_manager = self.session_manager.clone();
let session = {
let session_manager = trace_write_lock_unwrap!(session_manager);
session_manager.find_session(&request_header.authentication_token)
session_manager.find_session_by_token(&request_header.authentication_token)
};
if let Some(session) = session {
let (response, authorized) = if let Err(response) =
Expand Down
7 changes: 3 additions & 4 deletions server/src/services/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use opcua_core::supported_message::SupportedMessage;
use opcua_types::{status_code::StatusCode, *};

use crate::session::SessionManager;
use crate::{address_space::AddressSpace, services::Service, session::Session, state::ServerState};
use crate::{address_space::AddressSpace, services::Service, state::ServerState};

/// The method service. Allows a client to call a method on the server.
pub(crate) struct MethodService;
Expand All @@ -27,15 +27,14 @@ impl MethodService {
pub fn call(
&self,
server_state: Arc<RwLock<ServerState>>,
session: Arc<RwLock<Session>>,
session_id: &NodeId,
session_manager: Arc<RwLock<SessionManager>>,
address_space: Arc<RwLock<AddressSpace>>,
request: &CallRequest,
) -> SupportedMessage {
if let Some(ref calls) = request.methods_to_call {
let server_state = trace_read_lock_unwrap!(server_state);
if calls.len() <= server_state.operational_limits.max_nodes_per_method_call {
let mut session = trace_write_lock_unwrap!(session);
let mut address_space = trace_write_lock_unwrap!(address_space);

let results: Vec<CallMethodResult> = calls
Expand All @@ -54,7 +53,7 @@ impl MethodService {
// Call the method via whatever is registered in the address space
match address_space.call_method(
&server_state,
&mut session,
session_id,
session_manager.clone(),
request,
) {
Expand Down
2 changes: 1 addition & 1 deletion server/src/services/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl SessionService {
let server_state = trace_write_lock_unwrap!(server_state);
let session = {
let session_manager = trace_read_lock_unwrap!(session_manager);
session_manager.find_session(&request.request_header.authentication_token)
session_manager.find_session_by_token(&request.request_header.authentication_token)
};
if let Some(session) = session {
{
Expand Down
17 changes: 16 additions & 1 deletion server/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,24 @@ impl SessionManager {
self.sessions.clear();
}

/// Find a session by its session id and return it.
pub fn find_session_by_id(&self, session_id: &NodeId) -> Option<Arc<RwLock<Session>>> {
self.sessions
.iter()
.find(|s| {
let session = trace_read_lock_unwrap!(s.1);
session.session_id() == session_id
})
.map(|s| s.1)
.cloned()
}

/// Finds the session by its authentication token and returns it. The authentication token
/// can be renewed so it is not used as a key.
pub fn find_session(&self, authentication_token: &NodeId) -> Option<Arc<RwLock<Session>>> {
pub fn find_session_by_token(
&self,
authentication_token: &NodeId,
) -> Option<Arc<RwLock<Session>>> {
self.sessions
.iter()
.find(|s| {
Expand Down
2 changes: 1 addition & 1 deletion server/src/tests/address_space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ struct HelloWorld;
impl callbacks::Method for HelloWorld {
fn call(
&mut self,
_session: &mut Session,
_session_id: &NodeId,
_session_map: Arc<RwLock<SessionManager>>,
_request: &CallMethodRequest,
) -> Result<CallMethodResult, StatusCode> {
Expand Down
Loading

0 comments on commit 9036764

Please sign in to comment.