Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing properties to z_query_reply_options_t #431

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,22 @@ typedef struct z_query_reply_options_t {
* The encoding of the reply payload.
*/
struct z_owned_encoding_t *encoding;
/**
* The congestion control to apply when routing the reply.
*/
enum z_congestion_control_t congestion_control;
/**
* The priority of the reply.
*/
enum z_priority_t priority;
/**
* If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
*/
bool is_express;
/**
* The timestamp of the reply.
*/
struct z_timestamp_t *timestamp;
/**
* The source info for the reply.
*/
Expand Down
34 changes: 30 additions & 4 deletions src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@ use crate::transmute::{
TransmuteUninitPtr,
};
use crate::{
errors, z_closure_query_call, z_closure_query_loan, z_loaned_bytes_t, z_loaned_keyexpr_t,
z_loaned_session_t, z_loaned_value_t, z_owned_bytes_t, z_owned_closure_query_t,
z_owned_encoding_t, z_owned_source_info_t, z_view_string_from_substring, z_view_string_t,
errors, z_closure_query_call, z_closure_query_loan, z_congestion_control_t, z_loaned_bytes_t,
z_loaned_keyexpr_t, z_loaned_session_t, z_loaned_value_t, z_owned_bytes_t,
z_owned_closure_query_t, z_owned_encoding_t, z_owned_source_info_t, z_priority_t,
z_timestamp_t, z_view_string_from_substring, z_view_string_t,
};
use std::mem::MaybeUninit;
use std::ptr::null_mut;
use zenoh::core::Wait;
use zenoh::encoding::Encoding;
use zenoh::prelude::SessionDeclarations;
use zenoh::publisher::CongestionControl;
use zenoh::publisher::Priority;
use zenoh::queryable::{Query, Queryable};
use zenoh::sample::{SampleBuilderTrait, ValueBuilderTrait};
use zenoh::sample::{
QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait,
};

pub use crate::opaque_types::z_owned_queryable_t;
decl_transmute_owned!(Option<Queryable<'static, ()>>, z_owned_queryable_t);
Expand Down Expand Up @@ -110,6 +115,14 @@ pub extern "C" fn z_queryable_options_default(this: &mut z_queryable_options_t)
pub struct z_query_reply_options_t {
/// The encoding of the reply payload.
pub encoding: *mut z_owned_encoding_t,
/// The congestion control to apply when routing the reply.
pub congestion_control: z_congestion_control_t,
/// The priority of the reply.
pub priority: z_priority_t,
/// If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
pub is_express: bool,
/// The timestamp of the reply.
pub timestamp: *mut z_timestamp_t,
/// The source info for the reply.
pub source_info: *mut z_owned_source_info_t,
/// The attachment to this reply.
Expand All @@ -122,6 +135,10 @@ pub struct z_query_reply_options_t {
pub extern "C" fn z_query_reply_options_default(this: &mut z_query_reply_options_t) {
*this = z_query_reply_options_t {
encoding: null_mut(),
congestion_control: CongestionControl::default().into(),
priority: Priority::default().into(),
is_express: false,
timestamp: null_mut(),
source_info: null_mut(),
attachment: null_mut(),
};
Expand Down Expand Up @@ -256,6 +273,15 @@ pub extern "C" fn z_query_reply(
let attachment = attachment.transmute_mut().extract();
reply = reply.attachment(attachment);
}
if !options.timestamp.is_null() {
let timestamp = *unsafe { options.timestamp.as_mut() }
.unwrap()
.transmute_ref();
reply = reply.timestamp(Some(timestamp));
}
reply = reply.priority(options.priority.into());
reply = reply.congestion_control(options.congestion_control.into());
reply = reply.express(options.is_express);
}

if let Err(e) = reply.wait() {
Expand Down
68 changes: 64 additions & 4 deletions tests/z_int_queryable_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,40 @@ const char *const keyexpr = "test/key";
const char *const values[] = {"test_value_1", "test_value_2", "test_value_3"};
const size_t values_count = sizeof(values) / sizeof(values[0]);

const uint32_t TEST_EID = 42;
const uint64_t TEST_SN = 24;
const uint64_t TEST_TS = 401706000;
const uint8_t TEST_ID = 123;

void query_handler(const z_loaned_query_t *query, void *context) {
static int value_num = 0;

z_view_string_t params;
z_query_parameters(query, &params);
const z_loaned_value_t* payload_value = z_query_value(query);
const z_loaned_value_t *payload_value = z_query_value(query);

z_query_reply_options_t options;
z_query_reply_options_default(&options);


z_id_t self_id;
self_id.id[0] = TEST_ID;

z_entity_global_id_t entity_global_id;
z_entity_global_id_new(&entity_global_id, &self_id, TEST_EID);
z_owned_source_info_t source_info;
z_source_info_new(&source_info, &entity_global_id, TEST_SN);

z_timestamp_t ts;
z_timestamp_new(&ts, &self_id, TEST_TS + value_num);

options.source_info = &source_info;
options.timestamp = &ts;

z_owned_bytes_t payload;
z_bytes_encode_from_string(&payload, values[value_num]);

z_view_keyexpr_t reply_ke;
z_view_keyexpr_from_string(&reply_ke, (const char*)context);
z_view_keyexpr_from_string(&reply_ke, (const char *)context);
z_query_reply(query, z_loan(reply_ke), z_move(payload), &options);

if (++value_num == values_count) {
Expand Down Expand Up @@ -100,7 +119,7 @@ int run_get() {
for (z_recv(z_loan(handler), &reply); z_check(reply); z_recv(z_loan(handler), &reply)) {
assert(z_reply_is_ok(z_loan(reply)));

const z_loaned_sample_t* sample = z_reply_ok(z_loan(reply));
const z_loaned_sample_t *sample = z_reply_ok(z_loan(reply));
z_owned_string_t payload_string;
z_bytes_decode_into_string(z_sample_payload(sample), &payload_string);
if (strncmp(values[val_num], z_string_data(z_loan(payload_string)), z_string_len(z_loan(payload_string)))) {
Expand All @@ -109,6 +128,47 @@ int run_get() {
exit(-1);
}

const z_loaned_source_info_t *source_info = z_sample_source_info(sample);
if (source_info == NULL) {
perror("Unexpected null source_info");
exit(-1);
}
const uint64_t sn = z_source_info_sn(source_info);
if (sn != TEST_SN) {
perror("Unexpected sn value");
exit(-1);
}
const z_entity_global_id_t id = z_source_info_id(source_info);
uint32_t eid = z_entity_global_id_eid(&id);
if (eid != TEST_EID) {
perror("Unexpected eid value");
exit(-1);
}

const z_timestamp_t *ts = z_sample_timestamp(sample);
if (ts == NULL) {
perror("Unexpected null timestamp");
exit(-1);
}
const uint64_t time = z_timestamp_npt64_time(ts);
if (time != TEST_TS + val_num) {
perror("Unexpected timestamp value");
exit(-1);
}

z_id_t ts_id = z_timestamp_id(ts);
z_id_t gloabl_id = z_entity_global_id_zid(&id);

if (memcmp(ts_id.id, gloabl_id.id, sizeof(ts_id.id)) != 0) {
perror("Timestamp id and global id differ");
exit(-1);
}

if (ts_id.id[0] != TEST_ID) {
perror("Unexpected id value");
exit(-1);
}

z_drop(z_move(payload_string));
z_drop(z_move(reply));
}
Expand Down
Loading