diff --git a/src/get.rs b/src/get.rs index 6fa5b510b..45b7d8882 100644 --- a/src/get.rs +++ b/src/get.rs @@ -156,26 +156,20 @@ pub unsafe extern "C" fn z_get( let key_expr = key_expr.transmute_ref(); let mut get = session.get(Selector::new(key_expr, p)); if let Some(options) = options { - if let Some(payload) = options.payload.as_mut() { + if let Some(payload) = unsafe { options.payload.as_mut() } { let payload = payload.transmute_mut().extract(); get = get.payload(payload); } - if let Some(encoding) = options.encoding.as_mut() { + if let Some(encoding) = unsafe { options.encoding.as_mut() } { let encoding = encoding.transmute_mut().extract(); get = get.encoding(encoding); } - if !options.source_info.is_null() { - let source_info = unsafe { options.source_info.as_mut() } - .unwrap() - .transmute_mut() - .extract(); + if let Some(source_info) = unsafe { options.source_info.as_mut() } { + let source_info = source_info.transmute_mut().extract(); get = get.source_info(source_info); } - if !options.attachment.is_null() { - let attachment = unsafe { options.attachment.as_mut() } - .unwrap() - .transmute_mut() - .extract(); + if let Some(attachment) = unsafe { options.attachment.as_mut() } { + let attachment = attachment.transmute_mut().extract(); get = get.attachment(attachment); } diff --git a/src/publication_cache.rs b/src/publication_cache.rs index 0c95c0a0f..92b006364 100644 --- a/src/publication_cache.rs +++ b/src/publication_cache.rs @@ -95,10 +95,8 @@ pub extern "C" fn ze_declare_publication_cache( if options.resources_limit != 0 { p = p.resources_limit(options.resources_limit) } - if !options.queryable_prefix.is_null() { - let queryable_prefix = unsafe { options.queryable_prefix.as_ref() } - .unwrap() - .transmute_ref(); + if let Some(queryable_prefix) = unsafe { options.queryable_prefix.as_ref() } { + let queryable_prefix = queryable_prefix.transmute_ref(); p = p.queryable_prefix(queryable_prefix.clone()); } } diff --git a/src/publisher.rs b/src/publisher.rs index e2035807d..f9e05037c 100644 --- a/src/publisher.rs +++ b/src/publisher.rs @@ -177,25 +177,16 @@ pub unsafe extern "C" fn z_publisher_put( let mut put = publisher.put(payload); if let Some(options) = options { - if !options.encoding.is_null() { - let encoding = unsafe { options.encoding.as_mut() } - .unwrap() - .transmute_mut() - .extract(); + if let Some(encoding) = unsafe { options.encoding.as_mut() } { + let encoding = encoding.transmute_mut().extract(); put = put.encoding(encoding); }; - if !options.source_info.is_null() { - let source_info = unsafe { options.source_info.as_mut() } - .unwrap() - .transmute_mut() - .extract(); + if let Some(source_info) = unsafe { options.source_info.as_mut() } { + let source_info = source_info.transmute_mut().extract(); put = put.source_info(source_info); - } - if !options.attachment.is_null() { - let attachment = unsafe { options.attachment.as_mut() } - .unwrap() - .transmute_mut() - .extract(); + }; + if let Some(attachment) = unsafe { options.attachment.as_mut() } { + let attachment = attachment.transmute_mut().extract(); put = put.attachment(attachment); } } diff --git a/src/pull_subscriber.rs b/src/pull_subscriber.rs deleted file mode 100644 index 7c7b11868..000000000 --- a/src/pull_subscriber.rs +++ /dev/null @@ -1,220 +0,0 @@ -// -// Copyright (c) 2017, 2022 ZettaScale Technology. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh team, -// - -use crate::commons::*; -use crate::impl_guarded_transmute; -use crate::keyexpr::*; -use crate::session::*; -use crate::z_closure_sample_call; -use crate::z_owned_closure_sample_t; -use crate::z_reliability_t; -use crate::LOG_INVALID_SESSION; -use zenoh::prelude::sync::SyncResolve; -use zenoh::prelude::SessionDeclarations; -use zenoh_protocol::core::SubInfo; -use zenoh_util::core::zresult::ErrNo; - -/**************************************/ -/* DECLARATION */ -/**************************************/ -type PullSubscriber = Option>>; - -/// An owned zenoh pull subscriber. Destroying the subscriber cancels the subscription. -/// -/// Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. -/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. -/// -/// Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. -/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. -/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. -/// -/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. -#[cfg(not(target_arch = "arm"))] -#[repr(C, align(8))] -pub struct z_owned_pull_subscriber_t([u64; 1]); - -#[cfg(target_arch = "arm")] -#[repr(C, align(4))] -pub struct z_owned_pull_subscriber_t([u32; 1]); - -impl_guarded_transmute!(PullSubscriber, z_owned_pull_subscriber_t); - -#[repr(C)] -#[allow(non_camel_case_types)] -pub struct z_pull_subscriber_t<'a>(&'a z_owned_pull_subscriber_t); - -impl<'a> AsRef for z_pull_subscriber_t<'a> { - fn as_ref(&self) -> &PullSubscriber { - self.0 - } -} - -impl AsMut for z_owned_pull_subscriber_t { - fn as_mut(&mut self) -> &mut PullSubscriber { - unsafe { std::mem::transmute(self) } - } -} - -impl z_owned_pull_subscriber_t { - pub fn new(sub: zenoh::subscriber::PullSubscriber<'static, ()>) -> Self { - Some(Box::new(sub)).into() - } - pub fn null() -> Self { - None.into() - } -} - -/// Constructs a null safe-to-drop value of 'z_owned_pull_subscriber_t' type -#[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub extern "C" fn z_pull_subscriber_null() -> z_owned_pull_subscriber_t { - z_owned_pull_subscriber_t::null() -} - -/// Represents the set of options that can be applied to a pull subscriber, -/// upon its declaration via `z_declare_pull_subscriber`. -/// -/// Members: -/// z_reliability_t reliability: The subscription reliability. -#[repr(C)] -#[allow(non_camel_case_types)] -pub struct z_pull_subscriber_options_t { - reliability: z_reliability_t, -} - -/// Constructs the default value for `z_pull_subscriber_options_t`. -#[no_mangle] -pub extern "C" fn z_pull_subscriber_options_default() -> z_pull_subscriber_options_t { - let info = SubInfo::default(); - z_pull_subscriber_options_t { - reliability: info.reliability.into(), - } -} - -/// Declares a pull subscriber for a given key expression. -/// -/// Parameters: -/// session: The zenoh session. -/// keyexpr: The key expression to subscribe. -/// callback: The callback function that will be called each time a data matching the subscribed expression is received. -/// opts: Additional options for the pull subscriber. -/// -/// Returns: -/// A `z_owned_subscriber_t`. -/// -/// To check if the subscription succeeded and if the pull subscriber is still valid, -/// you may use `z_pull_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. -/// -/// Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. -/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. -/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. -/// -/// Example: -/// Declaring a subscriber passing ``NULL`` for the options: -/// -/// .. code-block:: C -/// -/// z_owned_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(expr), callback, NULL); -/// -/// is equivalent to initializing and passing the default subscriber options: -/// -/// .. code-block:: C -/// -/// z_subscriber_options_t options = z_subscriber_options_default(); -/// z_owned_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts); -#[no_mangle] -#[allow(clippy::missing_safety_doc)] -pub extern "C" fn z_declare_pull_subscriber( - session: z_loaned_session_t, - keyexpr: z_loaned_keyexpr_t, - callback: &mut z_owned_closure_sample_t, - options: Option<&z_pull_subscriber_options_t>, -) -> z_owned_pull_subscriber_t { - let mut closure = z_owned_closure_sample_t::empty(); - std::mem::swap(callback, &mut closure); - - match session.upgrade() { - Some(s) => { - let mut res = s - .declare_subscriber(keyexpr) - .callback(move |sample| { - let sample = z_loaned_sample_t::new(&sample); - z_closure_sample_call(&closure, &sample) - }) - .pull_mode(); - if let Some(opts) = opts { - res = res.reliability(opts.reliability.into()) - } - match res.res() { - Ok(sub) => z_owned_pull_subscriber_t::new(sub), - Err(e) => { - log::debug!("{}", e); - z_owned_pull_subscriber_t::null() - } - } - } - None => { - log::debug!("{}", LOG_INVALID_SESSION); - z_owned_pull_subscriber_t::null() - } - } -} - -/// Undeclares the given `z_owned_pull_subscriber_t`, droping it and invalidating it for double-drop safety. -#[allow(clippy::missing_safety_doc)] -#[no_mangle] -pub extern "C" fn z_undeclare_pull_subscriber(sub: &mut z_owned_pull_subscriber_t) -> i8 { - if let Some(s) = sub.as_mut().take() { - if let Err(e) = s.undeclare().res_sync() { - log::warn!("{}", e); - return e.errno().get(); - } - } - 0 -} - -/// Returns ``true`` if `sub` is valid. -#[allow(clippy::missing_safety_doc)] -#[no_mangle] -pub extern "C" fn z_pull_subscriber_check(sub: &z_owned_pull_subscriber_t) -> bool { - sub.as_ref().is_some() -} - -/// Returns ``true`` if `sub` is valid. -#[allow(clippy::missing_safety_doc)] -#[no_mangle] -pub extern "C" fn z_pull_subscriber_loan(sub: &z_owned_pull_subscriber_t) -> z_pull_subscriber_t { - z_pull_subscriber_t(sub) -} - -/// Pull data for `z_owned_pull_subscriber_t`. The pulled data will be provided -/// by calling the **callback** function provided to the `z_declare_subscriber` function. -/// -/// Parameters: -/// sub: The `z_owned_pull_subscriber_t` to pull from. -#[allow(clippy::missing_safety_doc)] -#[no_mangle] -pub extern "C" fn z_subscriber_pull(sub: z_pull_subscriber_t) -> i8 { - match sub.0.as_ref() { - Some(tx) => { - if let Err(e) = tx.pull().res_sync() { - log::error!("{}", e); - e.errno().get() - } else { - 0 - } - } - None => i8::MIN, - } -} diff --git a/src/put.rs b/src/put.rs index 7eae86dd3..073b9880e 100644 --- a/src/put.rs +++ b/src/put.rs @@ -82,25 +82,16 @@ pub extern "C" fn z_put( let mut put = session.put(key_expr, payload); if let Some(options) = options { - if !options.encoding.is_null() { - let encoding = unsafe { options.encoding.as_mut() } - .unwrap() - .transmute_mut() - .extract(); + if let Some(encoding) = unsafe { options.encoding.as_mut() } { + let encoding = encoding.transmute_mut().extract(); put = put.encoding(encoding); }; - if !options.source_info.is_null() { - let source_info = unsafe { options.source_info.as_mut() } - .unwrap() - .transmute_mut() - .extract(); + if let Some(source_info) = unsafe { options.source_info.as_mut() } { + let source_info = source_info.transmute_mut().extract(); put = put.source_info(source_info); }; - if !options.attachment.is_null() { - let attachment = unsafe { options.attachment.as_mut() } - .unwrap() - .transmute_mut() - .extract(); + if let Some(attachment) = unsafe { options.attachment.as_mut() } { + let attachment = attachment.transmute_mut().extract(); put = put.attachment(attachment); } put = put.priority(options.priority.into()); diff --git a/src/queryable.rs b/src/queryable.rs index 8efe0c849..a26f54b90 100644 --- a/src/queryable.rs +++ b/src/queryable.rs @@ -232,7 +232,7 @@ pub extern "C" fn z_queryable_check(this: &z_owned_queryable_t) -> bool { /// @return 0 in case of success, negative error code otherwise. #[allow(clippy::missing_safety_doc)] #[no_mangle] -pub unsafe extern "C" fn z_query_reply( +pub extern "C" fn z_query_reply( this: &z_loaned_query_t, key_expr: &z_loaned_keyexpr_t, payload: &mut z_owned_bytes_t, @@ -244,25 +244,16 @@ pub unsafe extern "C" fn z_query_reply( let mut reply = query.reply(key_expr, payload); if let Some(options) = options { - if !options.encoding.is_null() { - let encoding = unsafe { options.encoding.as_mut() } - .unwrap() - .transmute_mut() - .extract(); + if let Some(encoding) = unsafe { options.encoding.as_mut() } { + let encoding = encoding.transmute_mut().extract(); reply = reply.encoding(encoding); }; - if !options.source_info.is_null() { - let source_info = unsafe { options.source_info.as_mut() } - .unwrap() - .transmute_mut() - .extract(); + if let Some(source_info) = unsafe { options.source_info.as_mut() } { + let source_info = source_info.transmute_mut().extract(); reply = reply.source_info(source_info); }; - if !options.attachment.is_null() { - let attachment = unsafe { options.attachment.as_mut() } - .unwrap() - .transmute_mut() - .extract(); + if let Some(attachment) = unsafe { options.attachment.as_mut() } { + let attachment = attachment.transmute_mut().extract(); reply = reply.attachment(attachment); } } diff --git a/src/querying_subscriber.rs b/src/querying_subscriber.rs index 7e3dddbc4..72182a264 100644 --- a/src/querying_subscriber.rs +++ b/src/querying_subscriber.rs @@ -35,6 +35,8 @@ use crate::{ use crate::{zcu_locality_default, zcu_locality_t}; use zenoh::core::Wait; use zenoh::prelude::SessionDeclarations; +use zenoh::sample::SampleBuilderTrait; +use zenoh::sample::ValueBuilderTrait; use zenoh::session::Session; use zenoh::subscriber::Reliability; use zenoh_ext::*; @@ -140,12 +142,9 @@ pub unsafe extern "C" fn ze_declare_querying_subscriber( { sub = sub.allowed_origin(options.allowed_origin.into()); } - if !options.query_selector.is_null() { - let query_selector = unsafe { options.query_selector.as_ref() } - .unwrap() - .transmute_ref() - .clone(); - sub = sub.query_selector(query_selector) + if let Some(query_selector) = unsafe { options.query_selector.as_ref() } { + let query_selector = query_selector.transmute_ref().clone(); + sub = sub.query_selector(query_selector); } if options.query_timeout_ms != 0 { sub = sub.query_timeout(std::time::Duration::from_millis(options.query_timeout_ms)); @@ -185,15 +184,37 @@ pub unsafe extern "C" fn ze_querying_subscriber_get( if let Err(e) = sub .0 .fetch({ - move |cb| match options { - Some(options) => session - .get(selector) - .target(options.target.into()) - .consolidation(options.consolidation) - .timeout(std::time::Duration::from_millis(options.timeout_ms)) - .callback(cb) - .wait(), - None => session.get(selector).callback(cb).wait(), + move |cb| { + let mut get = session.get(selector).callback(cb); + + if let Some(options) = options { + if let Some(payload) = unsafe { options.payload.as_mut() } { + let payload = payload.transmute_mut().extract(); + get = get.payload(payload); + } + if let Some(encoding) = unsafe { options.encoding.as_mut() } { + let encoding = encoding.transmute_mut().extract(); + get = get.encoding(encoding); + } + if let Some(source_info) = unsafe { options.source_info.as_mut() } { + let source_info = source_info.transmute_mut().extract(); + get = get.source_info(source_info); + } + if let Some(attachment) = unsafe { options.attachment.as_mut() } { + let attachment = attachment.transmute_mut().extract(); + get = get.attachment(attachment); + } + + get = get + .consolidation(options.consolidation) + .target(options.target.into()); + + if options.timeout_ms != 0 { + get = get.timeout(std::time::Duration::from_millis(options.timeout_ms)); + } + } + + get.wait() } }) .wait()