From f6aa07d685122f85ff6cfb49afac220c5521b411 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 7 Jun 2024 19:07:06 +0200 Subject: [PATCH] Add missing properties to z_query_reply_options_t --- include/zenoh_commons.h | 16 +++++++++ src/queryable.rs | 34 +++++++++++++++--- tests/z_int_queryable_test.c | 68 +++++++++++++++++++++++++++++++++--- 3 files changed, 110 insertions(+), 8 deletions(-) diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index a50e81976..ef135c1b3 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -609,6 +609,22 @@ typedef struct z_query_reply_options_t { * The encoding of the reply payload. */ struct z_owned_encoding_t *encoding; + /** + * The congestion control to apply when routing the reply. + */ + enum z_congestion_control_t congestion_control; + /** + * The priority of the reply. + */ + enum z_priority_t priority; + /** + * If true, Zenoh will not wait to batch this operation with others to reduce the bandwith. + */ + bool is_express; + /** + * The timestamp of the reply. + */ + struct z_timestamp_t *timestamp; /** * The source info for the reply. */ diff --git a/src/queryable.rs b/src/queryable.rs index a26f54b90..48319d022 100644 --- a/src/queryable.rs +++ b/src/queryable.rs @@ -16,17 +16,22 @@ use crate::transmute::{ TransmuteUninitPtr, }; use crate::{ - errors, z_closure_query_call, z_closure_query_loan, z_loaned_bytes_t, z_loaned_keyexpr_t, - z_loaned_session_t, z_loaned_value_t, z_owned_bytes_t, z_owned_closure_query_t, - z_owned_encoding_t, z_owned_source_info_t, z_view_string_from_substring, z_view_string_t, + errors, z_closure_query_call, z_closure_query_loan, z_congestion_control_t, z_loaned_bytes_t, + z_loaned_keyexpr_t, z_loaned_session_t, z_loaned_value_t, z_owned_bytes_t, + z_owned_closure_query_t, z_owned_encoding_t, z_owned_source_info_t, z_priority_t, + z_timestamp_t, z_view_string_from_substring, z_view_string_t, }; use std::mem::MaybeUninit; use std::ptr::null_mut; use zenoh::core::Wait; use zenoh::encoding::Encoding; use zenoh::prelude::SessionDeclarations; +use zenoh::publisher::CongestionControl; +use zenoh::publisher::Priority; use zenoh::queryable::{Query, Queryable}; -use zenoh::sample::{SampleBuilderTrait, ValueBuilderTrait}; +use zenoh::sample::{ + QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait, +}; pub use crate::opaque_types::z_owned_queryable_t; decl_transmute_owned!(Option>, z_owned_queryable_t); @@ -110,6 +115,14 @@ pub extern "C" fn z_queryable_options_default(this: &mut z_queryable_options_t) pub struct z_query_reply_options_t { /// The encoding of the reply payload. pub encoding: *mut z_owned_encoding_t, + /// The congestion control to apply when routing the reply. + pub congestion_control: z_congestion_control_t, + /// The priority of the reply. + pub priority: z_priority_t, + /// If true, Zenoh will not wait to batch this operation with others to reduce the bandwith. + pub is_express: bool, + /// The timestamp of the reply. + pub timestamp: *mut z_timestamp_t, /// The source info for the reply. pub source_info: *mut z_owned_source_info_t, /// The attachment to this reply. @@ -122,6 +135,10 @@ pub struct z_query_reply_options_t { pub extern "C" fn z_query_reply_options_default(this: &mut z_query_reply_options_t) { *this = z_query_reply_options_t { encoding: null_mut(), + congestion_control: CongestionControl::default().into(), + priority: Priority::default().into(), + is_express: false, + timestamp: null_mut(), source_info: null_mut(), attachment: null_mut(), }; @@ -256,6 +273,15 @@ pub extern "C" fn z_query_reply( let attachment = attachment.transmute_mut().extract(); reply = reply.attachment(attachment); } + if !options.timestamp.is_null() { + let timestamp = *unsafe { options.timestamp.as_mut() } + .unwrap() + .transmute_ref(); + reply = reply.timestamp(Some(timestamp)); + } + reply = reply.priority(options.priority.into()); + reply = reply.congestion_control(options.congestion_control.into()); + reply = reply.express(options.is_express); } if let Err(e) = reply.wait() { diff --git a/tests/z_int_queryable_test.c b/tests/z_int_queryable_test.c index 972886b1a..f6e0a0a4a 100644 --- a/tests/z_int_queryable_test.c +++ b/tests/z_int_queryable_test.c @@ -25,21 +25,40 @@ const char *const keyexpr = "test/key"; const char *const values[] = {"test_value_1", "test_value_2", "test_value_3"}; const size_t values_count = sizeof(values) / sizeof(values[0]); +const uint32_t TEST_EID = 42; +const uint64_t TEST_SN = 24; +const uint64_t TEST_TS = 401706000; +const uint8_t TEST_ID = 123; + void query_handler(const z_loaned_query_t *query, void *context) { static int value_num = 0; z_view_string_t params; z_query_parameters(query, ¶ms); - const z_loaned_value_t* payload_value = z_query_value(query); + const z_loaned_value_t *payload_value = z_query_value(query); z_query_reply_options_t options; z_query_reply_options_default(&options); - + + z_id_t self_id; + self_id.id[0] = TEST_ID; + + z_entity_global_id_t entity_global_id; + z_entity_global_id_new(&entity_global_id, &self_id, TEST_EID); + z_owned_source_info_t source_info; + z_source_info_new(&source_info, &entity_global_id, TEST_SN); + + z_timestamp_t ts; + z_timestamp_new(&ts, &self_id, TEST_TS + value_num); + + options.source_info = &source_info; + options.timestamp = &ts; + z_owned_bytes_t payload; z_bytes_encode_from_string(&payload, values[value_num]); z_view_keyexpr_t reply_ke; - z_view_keyexpr_from_string(&reply_ke, (const char*)context); + z_view_keyexpr_from_string(&reply_ke, (const char *)context); z_query_reply(query, z_loan(reply_ke), z_move(payload), &options); if (++value_num == values_count) { @@ -100,7 +119,7 @@ int run_get() { for (z_recv(z_loan(handler), &reply); z_check(reply); z_recv(z_loan(handler), &reply)) { assert(z_reply_is_ok(z_loan(reply))); - const z_loaned_sample_t* sample = z_reply_ok(z_loan(reply)); + const z_loaned_sample_t *sample = z_reply_ok(z_loan(reply)); z_owned_string_t payload_string; z_bytes_decode_into_string(z_sample_payload(sample), &payload_string); if (strncmp(values[val_num], z_string_data(z_loan(payload_string)), z_string_len(z_loan(payload_string)))) { @@ -109,6 +128,47 @@ int run_get() { exit(-1); } + const z_loaned_source_info_t *source_info = z_sample_source_info(sample); + if (source_info == NULL) { + perror("Unexpected null source_info"); + exit(-1); + } + const uint64_t sn = z_source_info_sn(source_info); + if (sn != TEST_SN) { + perror("Unexpected sn value"); + exit(-1); + } + const z_entity_global_id_t id = z_source_info_id(source_info); + uint32_t eid = z_entity_global_id_eid(&id); + if (eid != TEST_EID) { + perror("Unexpected eid value"); + exit(-1); + } + + const z_timestamp_t *ts = z_sample_timestamp(sample); + if (ts == NULL) { + perror("Unexpected null timestamp"); + exit(-1); + } + const uint64_t time = z_timestamp_npt64_time(ts); + if (time != TEST_TS + val_num) { + perror("Unexpected timestamp value"); + exit(-1); + } + + z_id_t ts_id = z_timestamp_id(ts); + z_id_t gloabl_id = z_entity_global_id_zid(&id); + + if (memcmp(ts_id.id, gloabl_id.id, sizeof(ts_id.id)) != 0) { + perror("Timestamp id and global id differ"); + exit(-1); + } + + if (ts_id.id[0] != TEST_ID) { + perror("Unexpected id value"); + exit(-1); + } + z_drop(z_move(payload_string)); z_drop(z_move(reply)); }