Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamp and allowed_destination properties #428

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,10 @@ typedef struct z_publisher_options_t {
* If true, Zenoh will not wait to batch this message with others to reduce the bandwith
*/
bool is_express;
/**
* The allowed destination for thsi publisher.
*/
enum zcu_locality_t allowed_destination;
} z_publisher_options_t;
/**
* Options passed to the `z_declare_queryable()` function.
Expand Down Expand Up @@ -468,6 +472,14 @@ typedef struct z_delete_options_t {
* If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
*/
bool is_express;
/**
* The timestamp of this message.
*/
struct z_timestamp_t *timestamp;
/**
* The allowed destination of this message.
*/
enum zcu_locality_t allowed_destination;
} z_delete_options_t;
/**
* An entity gloabal id.
Expand Down Expand Up @@ -525,7 +537,10 @@ typedef struct z_get_options_t {
* whenever issued via `z_publisher_delete()`.
*/
typedef struct z_publisher_delete_options_t {
uint8_t __dummy;
/**
* The timestamp of this message.
*/
struct z_timestamp_t *timestamp;
} z_publisher_delete_options_t;
/**
* Options passed to the `z_publisher_put()` function.
Expand All @@ -535,6 +550,10 @@ typedef struct z_publisher_put_options_t {
* The encoding of the data to publish.
*/
struct z_owned_encoding_t *encoding;
/**
* The timestamp of the publication.
*/
struct z_timestamp_t *timestamp;
/**
* The source info for the publication.
*/
Expand Down Expand Up @@ -564,6 +583,14 @@ typedef struct z_put_options_t {
* If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
*/
bool is_express;
/**
* The timestamp of this message.
*/
struct z_timestamp_t *timestamp;
/**
* The allowed destination of this message.
*/
enum zcu_locality_t allowed_destination;
/**
* The source info for the message.
*/
Expand Down Expand Up @@ -2186,7 +2213,7 @@ ZENOHC_API bool z_publisher_check(const struct z_owned_publisher_t *this_);
*/
ZENOHC_API
z_error_t z_publisher_delete(const struct z_loaned_publisher_t *publisher,
const struct z_publisher_delete_options_t *_options);
const struct z_publisher_delete_options_t *options);
/**
* Constructs the default values for the delete operation via a publisher entity.
*/
Expand Down Expand Up @@ -3311,6 +3338,13 @@ const char *z_time_now_as_str(const char *buf,
* Returns id associated with this timestamp.
*/
ZENOHC_API struct z_id_t z_timestamp_id(const struct z_timestamp_t *this_);
/**
* Create timestamp
*/
ZENOHC_API
z_error_t z_timestamp_new(struct z_timestamp_t *this_,
const struct z_id_t *zid,
uint64_t npt64_time);
/**
* Returns NPT64 time associated with this timestamp.
*/
Expand Down
15 changes: 15 additions & 0 deletions src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ impl From<z_sample_kind_t> for SampleKind {
use crate::opaque_types::z_timestamp_t;
decl_transmute_copy!(Timestamp, z_timestamp_t);

/// Create timestamp
#[no_mangle]
pub extern "C" fn z_timestamp_new(
this: &mut z_timestamp_t,
zid: &z_id_t,
npt64_time: u64,
) -> errors::z_error_t {
let timestamp = Timestamp::new(
zenoh::time::NTP64(npt64_time),
(&zid.transmute_copy()).into(),
);
*this = timestamp.transmute_copy();
errors::Z_OK
}

/// Returns NPT64 time associated with this timestamp.
#[no_mangle]
pub extern "C" fn z_timestamp_npt64_time(this: &z_timestamp_t) -> u64 {
Expand Down
39 changes: 34 additions & 5 deletions src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ use crate::transmute::TransmuteRef;
use crate::transmute::TransmuteUninitPtr;
use crate::z_owned_encoding_t;
use crate::z_owned_source_info_t;
use crate::z_timestamp_t;
use crate::zcu_closure_matching_status_call;
use crate::zcu_closure_matching_status_loan;
use crate::zcu_locality_default;
use crate::zcu_locality_t;
use crate::zcu_owned_closure_matching_status_t;
use std::mem::MaybeUninit;
use std::ptr;
Expand All @@ -32,6 +35,7 @@ use zenoh::prelude::SessionDeclarations;
use zenoh::publisher::CongestionControl;
use zenoh::sample::QoSBuilderTrait;
use zenoh::sample::SampleBuilderTrait;
use zenoh::sample::TimestampBuilderTrait;
use zenoh::sample::ValueBuilderTrait;
use zenoh::{publisher::MatchingListener, publisher::Priority, publisher::Publisher};

Expand All @@ -48,6 +52,8 @@ pub struct z_publisher_options_t {
pub priority: z_priority_t,
/// If true, Zenoh will not wait to batch this message with others to reduce the bandwith
pub is_express: bool,
/// The allowed destination for thsi publisher.
pub allowed_destination: zcu_locality_t,
}

/// Constructs the default value for `z_publisher_options_t`.
Expand All @@ -57,6 +63,7 @@ pub extern "C" fn z_publisher_options_default(this: &mut z_publisher_options_t)
congestion_control: CongestionControl::default().into(),
priority: Priority::default().into(),
is_express: false,
allowed_destination: zcu_locality_default(),
};
}

Expand Down Expand Up @@ -94,7 +101,8 @@ pub extern "C" fn z_declare_publisher(
p = p
.congestion_control(options.congestion_control.into())
.priority(options.priority.into())
.express(options.is_express);
.express(options.is_express)
.allowed_destination(options.allowed_destination.into());
}
match p.wait() {
Err(e) => {
Expand Down Expand Up @@ -137,6 +145,8 @@ pub extern "C" fn z_publisher_loan(this: &z_owned_publisher_t) -> &z_loaned_publ
pub struct z_publisher_put_options_t {
/// The encoding of the data to publish.
pub encoding: *mut z_owned_encoding_t,
/// The timestamp of the publication.
pub timestamp: *mut z_timestamp_t,
/// The source info for the publication.
pub source_info: *mut z_owned_source_info_t,
/// The attachment to attach to the publication.
Expand All @@ -149,6 +159,7 @@ pub struct z_publisher_put_options_t {
pub extern "C" fn z_publisher_put_options_default(this: &mut z_publisher_put_options_t) {
*this = z_publisher_put_options_t {
encoding: ptr::null_mut(),
timestamp: ptr::null_mut(),
source_info: ptr::null_mut(),
attachment: ptr::null_mut(),
}
Expand Down Expand Up @@ -198,6 +209,12 @@ pub unsafe extern "C" fn z_publisher_put(
.extract();
put = put.attachment(attachment);
}
if !options.timestamp.is_null() {
let timestamp = *unsafe { options.timestamp.as_mut() }
.unwrap()
.transmute_ref();
put = put.timestamp(Some(timestamp));
}
}

if let Err(e) = put.wait() {
Expand All @@ -212,14 +229,17 @@ pub unsafe extern "C" fn z_publisher_put(
/// whenever issued via `z_publisher_delete()`.
#[repr(C)]
pub struct z_publisher_delete_options_t {
__dummy: u8,
/// The timestamp of this message.
pub timestamp: *mut z_timestamp_t,
}

/// Constructs the default values for the delete operation via a publisher entity.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub extern "C" fn z_publisher_delete_options_default(this: &mut z_publisher_delete_options_t) {
*this = z_publisher_delete_options_t { __dummy: 0 }
*this = z_publisher_delete_options_t {
timestamp: ptr::null_mut(),
}
}
/// Sends a `DELETE` message onto the publisher's key expression.
///
Expand All @@ -228,10 +248,19 @@ pub extern "C" fn z_publisher_delete_options_default(this: &mut z_publisher_dele
#[allow(clippy::missing_safety_doc)]
pub extern "C" fn z_publisher_delete(
publisher: &z_loaned_publisher_t,
_options: Option<&z_publisher_delete_options_t>,
options: Option<&z_publisher_delete_options_t>,
) -> errors::z_error_t {
let publisher = publisher.transmute_ref();
if let Err(e) = publisher.delete().wait() {
let mut del = publisher.delete();
if let Some(options) = options {
if !options.timestamp.is_null() {
let timestamp = *unsafe { options.timestamp.as_mut() }
.unwrap()
.transmute_ref();
del = del.timestamp(Some(timestamp));
}
}
if let Err(e) = del.wait() {
log::error!("{}", e);
errors::Z_EGENERIC
} else {
Expand Down
30 changes: 29 additions & 1 deletion src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ use crate::transmute::TransmuteFromHandle;
use crate::transmute::TransmuteRef;
use crate::z_loaned_session_t;
use crate::z_owned_bytes_t;
use crate::z_timestamp_t;
use zenoh::core::Wait;
use zenoh::publisher::CongestionControl;
use zenoh::publisher::Priority;
use zenoh::sample::QoSBuilderTrait;
use zenoh::sample::SampleBuilderTrait;
use zenoh::sample::TimestampBuilderTrait;
use zenoh::sample::ValueBuilderTrait;

/// Options passed to the `z_put()` function.
Expand All @@ -40,6 +42,10 @@ pub struct z_put_options_t {
pub priority: z_priority_t,
/// If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
pub is_express: bool,
/// The timestamp of this message.
pub timestamp: *mut z_timestamp_t,
/// The allowed destination of this message.
pub allowed_destination: zcu_locality_t,
/// The source info for the message.
pub source_info: *mut z_owned_source_info_t,
/// The attachment to this message.
Expand All @@ -55,6 +61,8 @@ pub extern "C" fn z_put_options_default(this: &mut z_put_options_t) {
congestion_control: CongestionControl::default().into(),
priority: Priority::default().into(),
is_express: false,
timestamp: null_mut(),
allowed_destination: zcu_locality_default(),
source_info: null_mut(),
attachment: null_mut(),
};
Expand Down Expand Up @@ -103,9 +111,16 @@ pub extern "C" fn z_put(
.extract();
put = put.attachment(attachment);
}
if !options.timestamp.is_null() {
let timestamp = *unsafe { options.timestamp.as_mut() }
.unwrap()
.transmute_ref();
put = put.timestamp(Some(timestamp));
}
put = put.priority(options.priority.into());
put = put.congestion_control(options.congestion_control.into());
put = put.express(options.is_express);
put = put.allowed_destination(options.allowed_destination.into());
}

if let Err(e) = put.wait() {
Expand All @@ -126,6 +141,10 @@ pub struct z_delete_options_t {
pub priority: z_priority_t,
/// If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
pub is_express: bool,
/// The timestamp of this message.
pub timestamp: *mut z_timestamp_t,
/// The allowed destination of this message.
pub allowed_destination: zcu_locality_t,
}

/// Constructs the default value for `z_delete_options_t`.
Expand All @@ -136,6 +155,8 @@ pub unsafe extern "C" fn z_delete_options_default(this: *mut z_delete_options_t)
congestion_control: CongestionControl::default().into(),
priority: Priority::default().into(),
is_express: false,
timestamp: null_mut(),
allowed_destination: zcu_locality_default(),
};
}

Expand All @@ -157,10 +178,17 @@ pub extern "C" fn z_delete(
let key_expr = key_expr.transmute_ref();
let mut del = session.delete(key_expr);
if let Some(options) = options {
if !options.timestamp.is_null() {
let timestamp = *unsafe { options.timestamp.as_mut() }
.unwrap()
.transmute_ref();
del = del.timestamp(Some(timestamp));
}
del = del
.congestion_control(options.congestion_control.into())
.priority(options.priority.into())
.express(options.is_express);
.express(options.is_express)
.allowed_destination(options.allowed_destination.into());
}

match del.wait() {
Expand Down
25 changes: 25 additions & 0 deletions tests/z_int_pub_sub_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const size_t values_count = sizeof(values) / sizeof(values[0]);

const uint32_t TEST_EID = 42;
const uint64_t TEST_SN = 24;
const uint64_t TEST_TS = 401706000;

int run_publisher() {
SEM_WAIT(sem);
Expand Down Expand Up @@ -62,9 +63,13 @@ int run_publisher() {
z_owned_source_info_t source_info;
z_source_info_new(&source_info, &entity_global_id, TEST_SN);

z_timestamp_t ts;
z_timestamp_new(&ts, &self_id, TEST_TS + i);

z_publisher_put_options_t options;
z_publisher_put_options_default(&options);
options.source_info = &source_info;
options.timestamp = &ts;

z_owned_bytes_t payload;
z_bytes_encode_from_string(&payload, values[i]);
Expand Down Expand Up @@ -116,6 +121,26 @@ void data_handler(const z_loaned_sample_t *sample, void *arg) {
perror("Unexpected eid value");
exit(-1);
}

const z_timestamp_t *ts = z_sample_timestamp(sample);
if (ts == NULL) {
perror("Unexpected null timestamp");
exit(-1);
}
const uint64_t time = z_timestamp_npt64_time(ts);
if (time != TEST_TS + val_num) {
perror("Unexpected timestamp value");
exit(-1);
}

z_id_t ts_id = z_timestamp_id(ts);
z_id_t gloabl_id = z_entity_global_id_zid(&id);

if (memcmp(ts_id.id, gloabl_id.id, sizeof(ts_id.id)) != 0) {
perror("Timestamp id and global id differ");
exit(-1);
}

if (++val_num == values_count) {
exit(0);
};
Expand Down
Loading