From 66dda9ceeca25d17556753b20afe3ec9486e3ccc Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 12 Jun 2024 16:10:38 +0200 Subject: [PATCH] Transport tests cleanup --- .../tests/multicast_compression.rs | 611 +++++----- .../tests/multicast_transport.rs | 605 +++++----- io/zenoh-transport/tests/unicast_multilink.rs | 1065 ++++++++--------- io/zenoh-transport/tests/unicast_shm.rs | 728 ++++++----- .../tests/unicast_simultaneous.rs | 619 +++++----- io/zenoh-transport/tests/unicast_transport.rs | 54 +- 6 files changed, 1816 insertions(+), 1866 deletions(-) diff --git a/io/zenoh-transport/tests/multicast_compression.rs b/io/zenoh-transport/tests/multicast_compression.rs index e046c96958..7dd6296164 100644 --- a/io/zenoh-transport/tests/multicast_compression.rs +++ b/io/zenoh-transport/tests/multicast_compression.rs @@ -14,353 +14,350 @@ // Restricting to macos by default because of no IPv6 support // on GitHub CI actions on Linux and Windows. -#[cfg(all(target_family = "unix", feature = "transport_compression"))] -mod tests { - use std::{ - any::Any, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, +#![cfg(all(target_family = "unix", feature = "transport_compression"))] + +use std::{ + any::Any, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; +use zenoh_core::ztimeout; +use zenoh_link::Link; +use zenoh_protocol::{ + core::{ + Channel, CongestionControl, Encoding, EndPoint, Priority, Reliability, WhatAmI, ZenohId, + }, + network::{ + push::{ + ext::{NodeIdType, QoSType}, + Push, }, - time::Duration, - }; - use zenoh_core::ztimeout; - use zenoh_link::Link; - use zenoh_protocol::{ - core::{ - Channel, CongestionControl, Encoding, EndPoint, Priority, Reliability, WhatAmI, ZenohId, - }, - network::{ - push::{ - ext::{NodeIdType, QoSType}, - Push, - }, - NetworkMessage, - }, - zenoh::Put, - }; - use zenoh_result::ZResult; - use zenoh_transport::{ - multicast::{TransportManagerBuilderMulticast, TransportMulticast}, - unicast::TransportUnicast, - TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, - TransportPeerEventHandler, - }; - - const TIMEOUT: Duration = Duration::from_secs(60); - const SLEEP: Duration = Duration::from_secs(1); - const SLEEP_COUNT: Duration = Duration::from_millis(10); + NetworkMessage, + }, + zenoh::Put, +}; +use zenoh_result::ZResult; +use zenoh_transport::{ + multicast::{TransportManagerBuilderMulticast, TransportMulticast}, + unicast::TransportUnicast, + TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, + TransportPeerEventHandler, +}; + +const TIMEOUT: Duration = Duration::from_secs(60); +const SLEEP: Duration = Duration::from_secs(1); +const SLEEP_COUNT: Duration = Duration::from_millis(10); + +const MSG_COUNT: usize = 1_000; +const MSG_SIZE_NOFRAG: [usize; 1] = [1_024]; + +// Transport Handler for the peer02 +struct SHPeer { + count: Arc, +} - const MSG_COUNT: usize = 1_000; - const MSG_SIZE_NOFRAG: [usize; 1] = [1_024]; +impl Default for SHPeer { + fn default() -> Self { + Self { + count: Arc::new(AtomicUsize::new(0)), + } + } +} - // Transport Handler for the peer02 - struct SHPeer { - count: Arc, +impl SHPeer { + fn get_count(&self) -> usize { + self.count.load(Ordering::Relaxed) } +} - impl Default for SHPeer { - fn default() -> Self { - Self { - count: Arc::new(AtomicUsize::new(0)), - } - } +impl TransportEventHandler for SHPeer { + fn new_unicast( + &self, + _peer: TransportPeer, + _transport: TransportUnicast, + ) -> ZResult> { + panic!(); } - impl SHPeer { - fn get_count(&self) -> usize { - self.count.load(Ordering::Relaxed) - } + fn new_multicast( + &self, + _transport: TransportMulticast, + ) -> ZResult> { + let arc = Arc::new(SCPeer::new(self.count.clone())); + Ok(arc) } +} - impl TransportEventHandler for SHPeer { - fn new_unicast( - &self, - _peer: TransportPeer, - _transport: TransportUnicast, - ) -> ZResult> { - panic!(); - } +// Transport Callback for the peer02 +pub struct SCPeer { + count: Arc, +} - fn new_multicast( - &self, - _transport: TransportMulticast, - ) -> ZResult> { - let arc = Arc::new(SCPeer::new(self.count.clone())); - Ok(arc) - } +impl SCPeer { + pub fn new(count: Arc) -> Self { + Self { count } } +} - // Transport Callback for the peer02 - pub struct SCPeer { - count: Arc, +impl TransportMulticastEventHandler for SCPeer { + fn new_peer(&self, peer: TransportPeer) -> ZResult> { + println!("\tNew peer: {:?}", peer); + Ok(Arc::new(SCPeer { + count: self.count.clone(), + })) } + fn closing(&self) {} + fn closed(&self) {} - impl SCPeer { - pub fn new(count: Arc) -> Self { - Self { count } - } + fn as_any(&self) -> &dyn Any { + self } +} - impl TransportMulticastEventHandler for SCPeer { - fn new_peer(&self, peer: TransportPeer) -> ZResult> { - println!("\tNew peer: {:?}", peer); - Ok(Arc::new(SCPeer { - count: self.count.clone(), - })) - } - fn closing(&self) {} - fn closed(&self) {} - - fn as_any(&self) -> &dyn Any { - self - } +impl TransportPeerEventHandler for SCPeer { + fn handle_message(&self, _msg: NetworkMessage) -> ZResult<()> { + self.count.fetch_add(1, Ordering::Relaxed); + Ok(()) } - impl TransportPeerEventHandler for SCPeer { - fn handle_message(&self, _msg: NetworkMessage) -> ZResult<()> { - self.count.fetch_add(1, Ordering::Relaxed); - Ok(()) - } - - fn new_link(&self, _link: Link) {} - fn del_link(&self, _link: Link) {} - fn closing(&self) {} - fn closed(&self) {} + fn new_link(&self, _link: Link) {} + fn del_link(&self, _link: Link) {} + fn closing(&self) {} + fn closed(&self) {} - fn as_any(&self) -> &dyn Any { - self - } + fn as_any(&self) -> &dyn Any { + self } +} - struct TransportMulticastPeer { - manager: TransportManager, - handler: Arc, - transport: TransportMulticast, - } +struct TransportMulticastPeer { + manager: TransportManager, + handler: Arc, + transport: TransportMulticast, +} - async fn open_transport( - endpoint: &EndPoint, - ) -> (TransportMulticastPeer, TransportMulticastPeer) { - // Define peer01 and peer02 IDs - let peer01_id = ZenohId::try_from([1]).unwrap(); - let peer02_id = ZenohId::try_from([2]).unwrap(); - - // Create the peer01 transport manager - let peer01_handler = Arc::new(SHPeer::default()); - let peer01_manager = TransportManager::builder() - .zid(peer01_id) - .whatami(WhatAmI::Peer) - .multicast(TransportManagerBuilderMulticast::default().compression(true)) - .build(peer01_handler.clone()) - .unwrap(); - - // Create the peer02 transport manager - let peer02_handler = Arc::new(SHPeer::default()); - let peer02_manager = TransportManager::builder() - .zid(peer02_id) - .whatami(WhatAmI::Peer) - .multicast(TransportManagerBuilderMulticast::default().compression(true)) - .build(peer02_handler.clone()) - .unwrap(); - - // Create an empty transport with the peer01 - // Open transport -> This should be accepted - println!("Opening transport with {endpoint}"); - let _ = ztimeout!(peer01_manager.open_transport_multicast(endpoint.clone())).unwrap(); - assert!(!peer01_manager.get_transports_multicast().await.is_empty()); - println!("\t{:?}", peer01_manager.get_transports_multicast().await); - - println!("Opening transport with {endpoint}"); - let _ = ztimeout!(peer02_manager.open_transport_multicast(endpoint.clone())).unwrap(); - assert!(!peer02_manager.get_transports_multicast().await.is_empty()); - println!("\t{:?}", peer02_manager.get_transports_multicast().await); - - // Wait to for peer 01 and 02 to join each other - ztimeout!(async { - while peer01_manager - .get_transport_multicast(&peer02_id) - .await - .is_none() - { - tokio::time::sleep(SLEEP_COUNT).await; - } - }); - let peer01_transport = peer01_manager +async fn open_transport(endpoint: &EndPoint) -> (TransportMulticastPeer, TransportMulticastPeer) { + // Define peer01 and peer02 IDs + let peer01_id = ZenohId::try_from([1]).unwrap(); + let peer02_id = ZenohId::try_from([2]).unwrap(); + + // Create the peer01 transport manager + let peer01_handler = Arc::new(SHPeer::default()); + let peer01_manager = TransportManager::builder() + .zid(peer01_id) + .whatami(WhatAmI::Peer) + .multicast(TransportManagerBuilderMulticast::default().compression(true)) + .build(peer01_handler.clone()) + .unwrap(); + + // Create the peer02 transport manager + let peer02_handler = Arc::new(SHPeer::default()); + let peer02_manager = TransportManager::builder() + .zid(peer02_id) + .whatami(WhatAmI::Peer) + .multicast(TransportManagerBuilderMulticast::default().compression(true)) + .build(peer02_handler.clone()) + .unwrap(); + + // Create an empty transport with the peer01 + // Open transport -> This should be accepted + println!("Opening transport with {endpoint}"); + let _ = ztimeout!(peer01_manager.open_transport_multicast(endpoint.clone())).unwrap(); + assert!(!peer01_manager.get_transports_multicast().await.is_empty()); + println!("\t{:?}", peer01_manager.get_transports_multicast().await); + + println!("Opening transport with {endpoint}"); + let _ = ztimeout!(peer02_manager.open_transport_multicast(endpoint.clone())).unwrap(); + assert!(!peer02_manager.get_transports_multicast().await.is_empty()); + println!("\t{:?}", peer02_manager.get_transports_multicast().await); + + // Wait to for peer 01 and 02 to join each other + ztimeout!(async { + while peer01_manager .get_transport_multicast(&peer02_id) .await - .unwrap(); - println!( - "\tPeer01 peers: {:?}", - peer01_transport.get_peers().unwrap() - ); - - ztimeout!(async { - while peer02_manager - .get_transport_multicast(&peer01_id) - .await - .is_none() - { - tokio::time::sleep(SLEEP_COUNT).await; - } - }); - let peer02_transport = peer02_manager + .is_none() + { + tokio::time::sleep(SLEEP_COUNT).await; + } + }); + let peer01_transport = peer01_manager + .get_transport_multicast(&peer02_id) + .await + .unwrap(); + println!( + "\tPeer01 peers: {:?}", + peer01_transport.get_peers().unwrap() + ); + + ztimeout!(async { + while peer02_manager .get_transport_multicast(&peer01_id) .await - .unwrap(); - println!( - "\tPeer02 peers: {:?}", - peer02_transport.get_peers().unwrap() - ); - - ( - TransportMulticastPeer { - manager: peer01_manager, - handler: peer01_handler, - transport: peer01_transport, - }, - TransportMulticastPeer { - manager: peer02_manager, - handler: peer02_handler, - transport: peer02_transport, - }, - ) - } - - async fn close_transport( - peer01: TransportMulticastPeer, - peer02: TransportMulticastPeer, - endpoint: &EndPoint, - ) { - // Close the peer01 transport - println!("Closing transport with {endpoint}"); - ztimeout!(peer01.transport.close()).unwrap(); - assert!(peer01.manager.get_transports_multicast().await.is_empty()); - ztimeout!(async { - while !peer02.transport.get_peers().unwrap().is_empty() { - tokio::time::sleep(SLEEP_COUNT).await; - } - }); - - // Close the peer02 transport - println!("Closing transport with {endpoint}"); - ztimeout!(peer02.transport.close()).unwrap(); - assert!(peer02.manager.get_transports_multicast().await.is_empty()); - - // Wait a little bit - tokio::time::sleep(SLEEP).await; - } - - async fn test_transport( - peer01: &TransportMulticastPeer, - peer02: &TransportMulticastPeer, - channel: Channel, - msg_size: usize, - ) { - // Create the message to send - let message: NetworkMessage = Push { - wire_expr: "test".into(), - ext_qos: QoSType::new(channel.priority, CongestionControl::Block, false), - ext_tstamp: None, - ext_nodeid: NodeIdType::default(), - payload: Put { - payload: vec![0u8; msg_size].into(), - timestamp: None, - encoding: Encoding::default(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_attachment: None, - ext_unknown: vec![], - } - .into(), + .is_none() + { + tokio::time::sleep(SLEEP_COUNT).await; } - .into(); + }); + let peer02_transport = peer02_manager + .get_transport_multicast(&peer01_id) + .await + .unwrap(); + println!( + "\tPeer02 peers: {:?}", + peer02_transport.get_peers().unwrap() + ); + + ( + TransportMulticastPeer { + manager: peer01_manager, + handler: peer01_handler, + transport: peer01_transport, + }, + TransportMulticastPeer { + manager: peer02_manager, + handler: peer02_handler, + transport: peer02_transport, + }, + ) +} - println!("Sending {MSG_COUNT} messages... {channel:?} {msg_size}"); - for _ in 0..MSG_COUNT { - peer01.transport.schedule(message.clone()).unwrap(); +async fn close_transport( + peer01: TransportMulticastPeer, + peer02: TransportMulticastPeer, + endpoint: &EndPoint, +) { + // Close the peer01 transport + println!("Closing transport with {endpoint}"); + ztimeout!(peer01.transport.close()).unwrap(); + assert!(peer01.manager.get_transports_multicast().await.is_empty()); + ztimeout!(async { + while !peer02.transport.get_peers().unwrap().is_empty() { + tokio::time::sleep(SLEEP_COUNT).await; } + }); - match channel.reliability { - Reliability::Reliable => { - ztimeout!(async { - while peer02.handler.get_count() != MSG_COUNT { - tokio::time::sleep(SLEEP_COUNT).await; - } - }); - } - Reliability::BestEffort => { - ztimeout!(async { - while peer02.handler.get_count() == 0 { - tokio::time::sleep(SLEEP_COUNT).await; - } - }); - } - }; + // Close the peer02 transport + println!("Closing transport with {endpoint}"); + ztimeout!(peer02.transport.close()).unwrap(); + assert!(peer02.manager.get_transports_multicast().await.is_empty()); + + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} - // Wait a little bit - tokio::time::sleep(SLEEP).await; +async fn test_transport( + peer01: &TransportMulticastPeer, + peer02: &TransportMulticastPeer, + channel: Channel, + msg_size: usize, +) { + // Create the message to send + let message: NetworkMessage = Push { + wire_expr: "test".into(), + ext_qos: QoSType::new(channel.priority, CongestionControl::Block, false), + ext_tstamp: None, + ext_nodeid: NodeIdType::default(), + payload: Put { + payload: vec![0u8; msg_size].into(), + timestamp: None, + encoding: Encoding::default(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], + } + .into(), } + .into(); - async fn run_single(endpoint: &EndPoint, channel: Channel, msg_size: usize) { - let (peer01, peer02) = open_transport(endpoint).await; - test_transport(&peer01, &peer02, channel, msg_size).await; + println!("Sending {MSG_COUNT} messages... {channel:?} {msg_size}"); + for _ in 0..MSG_COUNT { + peer01.transport.schedule(message.clone()).unwrap(); + } - #[cfg(feature = "stats")] - { - let stats = peer01.transport.get_stats().unwrap().report(); - println!("\tPeer 01: {:?}", stats); - let stats = peer02.transport.get_stats().unwrap().report(); - println!("\tPeer 02: {:?}", stats); + match channel.reliability { + Reliability::Reliable => { + ztimeout!(async { + while peer02.handler.get_count() != MSG_COUNT { + tokio::time::sleep(SLEEP_COUNT).await; + } + }); + } + Reliability::BestEffort => { + ztimeout!(async { + while peer02.handler.get_count() == 0 { + tokio::time::sleep(SLEEP_COUNT).await; + } + }); } + }; + + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} + +async fn run_single(endpoint: &EndPoint, channel: Channel, msg_size: usize) { + let (peer01, peer02) = open_transport(endpoint).await; + test_transport(&peer01, &peer02, channel, msg_size).await; - close_transport(peer01, peer02, endpoint).await; + #[cfg(feature = "stats")] + { + let stats = peer01.transport.get_stats().unwrap().report(); + println!("\tPeer 01: {:?}", stats); + let stats = peer02.transport.get_stats().unwrap().report(); + println!("\tPeer 02: {:?}", stats); } - async fn run(endpoints: &[EndPoint], channel: &[Channel], msg_size: &[usize]) { - for e in endpoints.iter() { - for ch in channel.iter() { - for ms in msg_size.iter() { - run_single(e, *ch, *ms).await; - } + close_transport(peer01, peer02, endpoint).await; +} + +async fn run(endpoints: &[EndPoint], channel: &[Channel], msg_size: &[usize]) { + for e in endpoints.iter() { + for ch in channel.iter() { + for ms in msg_size.iter() { + run_single(e, *ch, *ms).await; } } } +} - #[cfg(feature = "transport_udp")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn transport_multicast_compression_udp_only() { - zenoh_util::try_init_log_from_env(); - - // Define the locator - let endpoints: Vec = vec![ - format!( - "udp/224.{}.{}.{}:21000", - rand::random::(), - rand::random::(), - rand::random::() - ) - .parse() - .unwrap(), - // Disabling by default because of no IPv6 support - // on GitHub CI actions. - // format!("udp/{}", ZN_MULTICAST_IPV6_ADDRESS_DEFAULT) - // .parse() - // .unwrap(), - ]; - // Define the reliability and congestion control - let channel = [ - Channel { - priority: Priority::default(), - reliability: Reliability::BestEffort, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::BestEffort, - }, - ]; - // Run - run(&endpoints, &channel, &MSG_SIZE_NOFRAG).await; - } +#[cfg(feature = "transport_udp")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_multicast_compression_udp_only() { + zenoh_util::try_init_log_from_env(); + + // Define the locator + let endpoints: Vec = vec![ + format!( + "udp/224.{}.{}.{}:21000", + rand::random::(), + rand::random::(), + rand::random::() + ) + .parse() + .unwrap(), + // Disabling by default because of no IPv6 support + // on GitHub CI actions. + // format!("udp/{}", ZN_MULTICAST_IPV6_ADDRESS_DEFAULT) + // .parse() + // .unwrap(), + ]; + // Define the reliability and congestion control + let channel = [ + Channel { + priority: Priority::default(), + reliability: Reliability::BestEffort, + }, + Channel { + priority: Priority::RealTime, + reliability: Reliability::BestEffort, + }, + ]; + // Run + run(&endpoints, &channel, &MSG_SIZE_NOFRAG).await; } diff --git a/io/zenoh-transport/tests/multicast_transport.rs b/io/zenoh-transport/tests/multicast_transport.rs index 87422daf2a..4c04748f81 100644 --- a/io/zenoh-transport/tests/multicast_transport.rs +++ b/io/zenoh-transport/tests/multicast_transport.rs @@ -14,350 +14,347 @@ // Restricting to macos by default because of no IPv6 support // on GitHub CI actions on Linux and Windows. -#[cfg(target_family = "unix")] -#[cfg(all(feature = "transport_compression", feature = "transport_udp"))] -mod tests { - use std::{ - any::Any, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Duration, - }; - use zenoh_core::ztimeout; - use zenoh_link::Link; - use zenoh_protocol::{ - core::{ - Channel, CongestionControl, Encoding, EndPoint, Priority, Reliability, WhatAmI, ZenohId, +#![cfg(target_family = "unix")] +#![cfg(all(feature = "transport_compression", feature = "transport_udp"))] + +use std::{ + any::Any, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; +use zenoh_core::ztimeout; +use zenoh_link::Link; +use zenoh_protocol::{ + core::{ + Channel, CongestionControl, Encoding, EndPoint, Priority, Reliability, WhatAmI, ZenohId, + }, + network::{ + push::{ + ext::{NodeIdType, QoSType}, + Push, }, - network::{ - push::{ - ext::{NodeIdType, QoSType}, - Push, - }, - NetworkMessage, - }, - zenoh::Put, - }; - use zenoh_result::ZResult; - use zenoh_transport::{ - multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, - TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, - }; - - const TIMEOUT: Duration = Duration::from_secs(60); - const SLEEP: Duration = Duration::from_secs(1); - const SLEEP_COUNT: Duration = Duration::from_millis(10); + NetworkMessage, + }, + zenoh::Put, +}; +use zenoh_result::ZResult; +use zenoh_transport::{ + multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, + TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, +}; + +const TIMEOUT: Duration = Duration::from_secs(60); +const SLEEP: Duration = Duration::from_secs(1); +const SLEEP_COUNT: Duration = Duration::from_millis(10); + +const MSG_COUNT: usize = 1_000; +const MSG_SIZE_NOFRAG: [usize; 1] = [1_024]; + +// Transport Handler for the peer02 +struct SHPeer { + count: Arc, +} - const MSG_COUNT: usize = 1_000; - const MSG_SIZE_NOFRAG: [usize; 1] = [1_024]; +impl Default for SHPeer { + fn default() -> Self { + Self { + count: Arc::new(AtomicUsize::new(0)), + } + } +} - // Transport Handler for the peer02 - struct SHPeer { - count: Arc, +impl SHPeer { + fn get_count(&self) -> usize { + self.count.load(Ordering::Relaxed) } +} - impl Default for SHPeer { - fn default() -> Self { - Self { - count: Arc::new(AtomicUsize::new(0)), - } - } +impl TransportEventHandler for SHPeer { + fn new_unicast( + &self, + _peer: TransportPeer, + _transport: TransportUnicast, + ) -> ZResult> { + panic!(); } - impl SHPeer { - fn get_count(&self) -> usize { - self.count.load(Ordering::Relaxed) - } + fn new_multicast( + &self, + _transport: TransportMulticast, + ) -> ZResult> { + let arc = Arc::new(SCPeer::new(self.count.clone())); + Ok(arc) } +} - impl TransportEventHandler for SHPeer { - fn new_unicast( - &self, - _peer: TransportPeer, - _transport: TransportUnicast, - ) -> ZResult> { - panic!(); - } +// Transport Callback for the peer02 +pub struct SCPeer { + count: Arc, +} - fn new_multicast( - &self, - _transport: TransportMulticast, - ) -> ZResult> { - let arc = Arc::new(SCPeer::new(self.count.clone())); - Ok(arc) - } +impl SCPeer { + pub fn new(count: Arc) -> Self { + Self { count } } +} - // Transport Callback for the peer02 - pub struct SCPeer { - count: Arc, +impl TransportMulticastEventHandler for SCPeer { + fn new_peer(&self, peer: TransportPeer) -> ZResult> { + println!("\tNew peer: {:?}", peer); + Ok(Arc::new(SCPeer { + count: self.count.clone(), + })) } + fn closing(&self) {} + fn closed(&self) {} - impl SCPeer { - pub fn new(count: Arc) -> Self { - Self { count } - } + fn as_any(&self) -> &dyn Any { + self } +} - impl TransportMulticastEventHandler for SCPeer { - fn new_peer(&self, peer: TransportPeer) -> ZResult> { - println!("\tNew peer: {:?}", peer); - Ok(Arc::new(SCPeer { - count: self.count.clone(), - })) - } - fn closing(&self) {} - fn closed(&self) {} - - fn as_any(&self) -> &dyn Any { - self - } +impl TransportPeerEventHandler for SCPeer { + fn handle_message(&self, _msg: NetworkMessage) -> ZResult<()> { + self.count.fetch_add(1, Ordering::Relaxed); + Ok(()) } - impl TransportPeerEventHandler for SCPeer { - fn handle_message(&self, _msg: NetworkMessage) -> ZResult<()> { - self.count.fetch_add(1, Ordering::Relaxed); - Ok(()) - } - - fn new_link(&self, _link: Link) {} - fn del_link(&self, _link: Link) {} - fn closing(&self) {} - fn closed(&self) {} + fn new_link(&self, _link: Link) {} + fn del_link(&self, _link: Link) {} + fn closing(&self) {} + fn closed(&self) {} - fn as_any(&self) -> &dyn Any { - self - } + fn as_any(&self) -> &dyn Any { + self } +} - struct TransportMulticastPeer { - manager: TransportManager, - handler: Arc, - transport: TransportMulticast, - } +struct TransportMulticastPeer { + manager: TransportManager, + handler: Arc, + transport: TransportMulticast, +} - async fn open_transport( - endpoint: &EndPoint, - ) -> (TransportMulticastPeer, TransportMulticastPeer) { - // Define peer01 and peer02 IDs - let peer01_id = ZenohId::try_from([1]).unwrap(); - let peer02_id = ZenohId::try_from([2]).unwrap(); - - // Create the peer01 transport manager - let peer01_handler = Arc::new(SHPeer::default()); - let peer01_manager = TransportManager::builder() - .zid(peer01_id) - .whatami(WhatAmI::Peer) - .build(peer01_handler.clone()) - .unwrap(); - - // Create the peer02 transport manager - let peer02_handler = Arc::new(SHPeer::default()); - let peer02_manager = TransportManager::builder() - .whatami(WhatAmI::Peer) - .zid(peer02_id) - .build(peer02_handler.clone()) - .unwrap(); - - // Create an empty transport with the peer01 - // Open transport -> This should be accepted - println!("Opening transport with {endpoint}"); - let _ = ztimeout!(peer01_manager.open_transport_multicast(endpoint.clone())).unwrap(); - assert!(!peer01_manager.get_transports_multicast().await.is_empty()); - println!("\t{:?}", peer01_manager.get_transports_multicast().await); - - println!("Opening transport with {endpoint}"); - let _ = ztimeout!(peer02_manager.open_transport_multicast(endpoint.clone())).unwrap(); - assert!(!peer02_manager.get_transports_multicast().await.is_empty()); - println!("\t{:?}", peer02_manager.get_transports_multicast().await); - - // Wait to for peer 01 and 02 to join each other - ztimeout!(async { - while peer01_manager - .get_transport_multicast(&peer02_id) - .await - .is_none() - { - tokio::time::sleep(SLEEP_COUNT).await; - } - }); - let peer01_transport = peer01_manager +async fn open_transport(endpoint: &EndPoint) -> (TransportMulticastPeer, TransportMulticastPeer) { + // Define peer01 and peer02 IDs + let peer01_id = ZenohId::try_from([1]).unwrap(); + let peer02_id = ZenohId::try_from([2]).unwrap(); + + // Create the peer01 transport manager + let peer01_handler = Arc::new(SHPeer::default()); + let peer01_manager = TransportManager::builder() + .zid(peer01_id) + .whatami(WhatAmI::Peer) + .build(peer01_handler.clone()) + .unwrap(); + + // Create the peer02 transport manager + let peer02_handler = Arc::new(SHPeer::default()); + let peer02_manager = TransportManager::builder() + .whatami(WhatAmI::Peer) + .zid(peer02_id) + .build(peer02_handler.clone()) + .unwrap(); + + // Create an empty transport with the peer01 + // Open transport -> This should be accepted + println!("Opening transport with {endpoint}"); + let _ = ztimeout!(peer01_manager.open_transport_multicast(endpoint.clone())).unwrap(); + assert!(!peer01_manager.get_transports_multicast().await.is_empty()); + println!("\t{:?}", peer01_manager.get_transports_multicast().await); + + println!("Opening transport with {endpoint}"); + let _ = ztimeout!(peer02_manager.open_transport_multicast(endpoint.clone())).unwrap(); + assert!(!peer02_manager.get_transports_multicast().await.is_empty()); + println!("\t{:?}", peer02_manager.get_transports_multicast().await); + + // Wait to for peer 01 and 02 to join each other + ztimeout!(async { + while peer01_manager .get_transport_multicast(&peer02_id) .await - .unwrap(); - println!( - "\tPeer01 peers: {:?}", - peer01_transport.get_peers().unwrap() - ); - - ztimeout!(async { - while peer02_manager - .get_transport_multicast(&peer01_id) - .await - .is_none() - { - tokio::time::sleep(SLEEP_COUNT).await; - } - }); - let peer02_transport = peer02_manager + .is_none() + { + tokio::time::sleep(SLEEP_COUNT).await; + } + }); + let peer01_transport = peer01_manager + .get_transport_multicast(&peer02_id) + .await + .unwrap(); + println!( + "\tPeer01 peers: {:?}", + peer01_transport.get_peers().unwrap() + ); + + ztimeout!(async { + while peer02_manager .get_transport_multicast(&peer01_id) .await - .unwrap(); - println!( - "\tPeer02 peers: {:?}", - peer02_transport.get_peers().unwrap() - ); - - ( - TransportMulticastPeer { - manager: peer01_manager, - handler: peer01_handler, - transport: peer01_transport, - }, - TransportMulticastPeer { - manager: peer02_manager, - handler: peer02_handler, - transport: peer02_transport, - }, - ) - } - - async fn close_transport( - peer01: TransportMulticastPeer, - peer02: TransportMulticastPeer, - endpoint: &EndPoint, - ) { - // Close the peer01 transport - println!("Closing transport with {endpoint}"); - ztimeout!(peer01.transport.close()).unwrap(); - assert!(peer01.manager.get_transports_multicast().await.is_empty()); - ztimeout!(async { - while !peer02.transport.get_peers().unwrap().is_empty() { - tokio::time::sleep(SLEEP_COUNT).await; - } - }); - - // Close the peer02 transport - println!("Closing transport with {endpoint}"); - ztimeout!(peer02.transport.close()).unwrap(); - assert!(peer02.manager.get_transports_multicast().await.is_empty()); - - // Wait a little bit - tokio::time::sleep(SLEEP).await; - } - - async fn test_transport( - peer01: &TransportMulticastPeer, - peer02: &TransportMulticastPeer, - channel: Channel, - msg_size: usize, - ) { - // Create the message to send - let message: NetworkMessage = Push { - wire_expr: "test".into(), - ext_qos: QoSType::new(channel.priority, CongestionControl::Block, false), - ext_tstamp: None, - ext_nodeid: NodeIdType::default(), - payload: Put { - payload: vec![0u8; msg_size].into(), - timestamp: None, - encoding: Encoding::default(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_attachment: None, - ext_unknown: vec![], - } - .into(), + .is_none() + { + tokio::time::sleep(SLEEP_COUNT).await; } - .into(); + }); + let peer02_transport = peer02_manager + .get_transport_multicast(&peer01_id) + .await + .unwrap(); + println!( + "\tPeer02 peers: {:?}", + peer02_transport.get_peers().unwrap() + ); + + ( + TransportMulticastPeer { + manager: peer01_manager, + handler: peer01_handler, + transport: peer01_transport, + }, + TransportMulticastPeer { + manager: peer02_manager, + handler: peer02_handler, + transport: peer02_transport, + }, + ) +} - println!("Sending {MSG_COUNT} messages... {channel:?} {msg_size}"); - for _ in 0..MSG_COUNT { - peer01.transport.schedule(message.clone()).unwrap(); +async fn close_transport( + peer01: TransportMulticastPeer, + peer02: TransportMulticastPeer, + endpoint: &EndPoint, +) { + // Close the peer01 transport + println!("Closing transport with {endpoint}"); + ztimeout!(peer01.transport.close()).unwrap(); + assert!(peer01.manager.get_transports_multicast().await.is_empty()); + ztimeout!(async { + while !peer02.transport.get_peers().unwrap().is_empty() { + tokio::time::sleep(SLEEP_COUNT).await; } + }); - match channel.reliability { - Reliability::Reliable => { - ztimeout!(async { - while peer02.handler.get_count() != MSG_COUNT { - tokio::time::sleep(SLEEP_COUNT).await; - } - }); - } - Reliability::BestEffort => { - ztimeout!(async { - while peer02.handler.get_count() == 0 { - tokio::time::sleep(SLEEP_COUNT).await; - } - }); - } - }; + // Close the peer02 transport + println!("Closing transport with {endpoint}"); + ztimeout!(peer02.transport.close()).unwrap(); + assert!(peer02.manager.get_transports_multicast().await.is_empty()); - // Wait a little bit - tokio::time::sleep(SLEEP).await; + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} + +async fn test_transport( + peer01: &TransportMulticastPeer, + peer02: &TransportMulticastPeer, + channel: Channel, + msg_size: usize, +) { + // Create the message to send + let message: NetworkMessage = Push { + wire_expr: "test".into(), + ext_qos: QoSType::new(channel.priority, CongestionControl::Block, false), + ext_tstamp: None, + ext_nodeid: NodeIdType::default(), + payload: Put { + payload: vec![0u8; msg_size].into(), + timestamp: None, + encoding: Encoding::default(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], + } + .into(), } + .into(); - async fn run_single(endpoint: &EndPoint, channel: Channel, msg_size: usize) { - let (peer01, peer02) = open_transport(endpoint).await; - test_transport(&peer01, &peer02, channel, msg_size).await; + println!("Sending {MSG_COUNT} messages... {channel:?} {msg_size}"); + for _ in 0..MSG_COUNT { + peer01.transport.schedule(message.clone()).unwrap(); + } - #[cfg(feature = "stats")] - { - let stats = peer01.transport.get_stats().unwrap().report(); - println!("\tPeer 01: {:?}", stats); - let stats = peer02.transport.get_stats().unwrap().report(); - println!("\tPeer 02: {:?}", stats); + match channel.reliability { + Reliability::Reliable => { + ztimeout!(async { + while peer02.handler.get_count() != MSG_COUNT { + tokio::time::sleep(SLEEP_COUNT).await; + } + }); } + Reliability::BestEffort => { + ztimeout!(async { + while peer02.handler.get_count() == 0 { + tokio::time::sleep(SLEEP_COUNT).await; + } + }); + } + }; - close_transport(peer01, peer02, endpoint).await; + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} + +async fn run_single(endpoint: &EndPoint, channel: Channel, msg_size: usize) { + let (peer01, peer02) = open_transport(endpoint).await; + test_transport(&peer01, &peer02, channel, msg_size).await; + + #[cfg(feature = "stats")] + { + let stats = peer01.transport.get_stats().unwrap().report(); + println!("\tPeer 01: {:?}", stats); + let stats = peer02.transport.get_stats().unwrap().report(); + println!("\tPeer 02: {:?}", stats); } - async fn run(endpoints: &[EndPoint], channel: &[Channel], msg_size: &[usize]) { - for e in endpoints.iter() { - for ch in channel.iter() { - for ms in msg_size.iter() { - run_single(e, *ch, *ms).await; - } + close_transport(peer01, peer02, endpoint).await; +} + +async fn run(endpoints: &[EndPoint], channel: &[Channel], msg_size: &[usize]) { + for e in endpoints.iter() { + for ch in channel.iter() { + for ms in msg_size.iter() { + run_single(e, *ch, *ms).await; } } } +} - #[cfg(all(feature = "transport_compression", feature = "transport_udp"))] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn transport_multicast_udp_only() { - zenoh_util::try_init_log_from_env(); - - // Define the locator - let endpoints: Vec = vec![ - format!( - "udp/224.{}.{}.{}:20000", - rand::random::(), - rand::random::(), - rand::random::() - ) - .parse() - .unwrap(), - // Disabling by default because of no IPv6 support - // on GitHub CI actions. - // format!("udp/{}", ZN_MULTICAST_IPV6_ADDRESS_DEFAULT) - // .parse() - // .unwrap(), - ]; - // Define the reliability and congestion control - let channel = [ - Channel { - priority: Priority::default(), - reliability: Reliability::BestEffort, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::BestEffort, - }, - ]; - // Run - run(&endpoints, &channel, &MSG_SIZE_NOFRAG).await; - } +#[cfg(all(feature = "transport_compression", feature = "transport_udp"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_multicast_udp_only() { + zenoh_util::try_init_log_from_env(); + + // Define the locator + let endpoints: Vec = vec![ + format!( + "udp/224.{}.{}.{}:20000", + rand::random::(), + rand::random::(), + rand::random::() + ) + .parse() + .unwrap(), + // Disabling by default because of no IPv6 support + // on GitHub CI actions. + // format!("udp/{}", ZN_MULTICAST_IPV6_ADDRESS_DEFAULT) + // .parse() + // .unwrap(), + ]; + // Define the reliability and congestion control + let channel = [ + Channel { + priority: Priority::default(), + reliability: Reliability::BestEffort, + }, + Channel { + priority: Priority::RealTime, + reliability: Reliability::BestEffort, + }, + ]; + // Run + run(&endpoints, &channel, &MSG_SIZE_NOFRAG).await; } diff --git a/io/zenoh-transport/tests/unicast_multilink.rs b/io/zenoh-transport/tests/unicast_multilink.rs index d69a30ac9d..06338fcf25 100644 --- a/io/zenoh-transport/tests/unicast_multilink.rs +++ b/io/zenoh-transport/tests/unicast_multilink.rs @@ -11,533 +11,533 @@ // Contributors: // ZettaScale Zenoh Team, // -#[cfg(feature = "transport_multilink")] -mod tests { - use std::{convert::TryFrom, sync::Arc, time::Duration}; - use zenoh_core::ztimeout; - use zenoh_link::EndPoint; - use zenoh_protocol::core::{WhatAmI, ZenohId}; - use zenoh_result::ZResult; - use zenoh_transport::{ - multicast::TransportMulticast, unicast::TransportUnicast, DummyTransportPeerEventHandler, - TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, - TransportPeerEventHandler, - }; - - const TIMEOUT: Duration = Duration::from_secs(60); - const SLEEP: Duration = Duration::from_millis(100); - - #[cfg(test)] - #[derive(Default)] - struct SHRouterOpenClose; - - impl TransportEventHandler for SHRouterOpenClose { - fn new_unicast( - &self, - _peer: TransportPeer, - _transport: TransportUnicast, - ) -> ZResult> { - Ok(Arc::new(DummyTransportPeerEventHandler)) - } - - fn new_multicast( - &self, - _transport: TransportMulticast, - ) -> ZResult> { - panic!(); - } +#![cfg(feature = "transport_multilink")] + +use std::{convert::TryFrom, sync::Arc, time::Duration}; +use zenoh_core::ztimeout; +use zenoh_link::EndPoint; +use zenoh_protocol::core::{WhatAmI, ZenohId}; +use zenoh_result::ZResult; +use zenoh_transport::{ + multicast::TransportMulticast, unicast::TransportUnicast, DummyTransportPeerEventHandler, + TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, + TransportPeerEventHandler, +}; + +const TIMEOUT: Duration = Duration::from_secs(60); +const SLEEP: Duration = Duration::from_millis(100); + +#[cfg(test)] +#[derive(Default)] +struct SHRouterOpenClose; + +impl TransportEventHandler for SHRouterOpenClose { + fn new_unicast( + &self, + _peer: TransportPeer, + _transport: TransportUnicast, + ) -> ZResult> { + Ok(Arc::new(DummyTransportPeerEventHandler)) } - // Transport Handler for the client - struct SHClientOpenClose {} - - impl SHClientOpenClose { - fn new() -> Self { - Self {} - } + fn new_multicast( + &self, + _transport: TransportMulticast, + ) -> ZResult> { + panic!(); } +} - impl TransportEventHandler for SHClientOpenClose { - fn new_unicast( - &self, - _peer: TransportPeer, - _transport: TransportUnicast, - ) -> ZResult> { - Ok(Arc::new(DummyTransportPeerEventHandler)) - } +// Transport Handler for the client +struct SHClientOpenClose {} - fn new_multicast( - &self, - _transport: TransportMulticast, - ) -> ZResult> { - panic!(); - } +impl SHClientOpenClose { + fn new() -> Self { + Self {} } +} - async fn multilink_transport(endpoint: &EndPoint) { - /* [ROUTER] */ - let router_id = ZenohId::try_from([1]).unwrap(); - - let router_handler = Arc::new(SHRouterOpenClose); - // Create the router transport manager - let unicast = TransportManager::config_unicast() - .max_links(2) - .max_sessions(2); - let router_manager = TransportManager::builder() - .whatami(WhatAmI::Router) - .zid(router_id) - .unicast(unicast) - .build(router_handler.clone()) - .unwrap(); - - /* [CLIENT] */ - let client01_id = ZenohId::try_from([2]).unwrap(); - let client02_id = ZenohId::try_from([3]).unwrap(); - - // Create the transport transport manager for the first client - let unicast = TransportManager::config_unicast() - .max_links(2) - .max_sessions(1); - let client01_manager = TransportManager::builder() - .whatami(WhatAmI::Client) - .zid(client01_id) - .unicast(unicast) - .build(Arc::new(SHClientOpenClose::new())) - .unwrap(); - - // Create the transport transport manager for the second client - let unicast = TransportManager::config_unicast() - .max_links(1) - .max_sessions(1); - let client02_manager = TransportManager::builder() - .whatami(WhatAmI::Client) - .zid(client02_id) - .unicast(unicast) - .build(Arc::new(SHClientOpenClose::new())) - .unwrap(); +impl TransportEventHandler for SHClientOpenClose { + fn new_unicast( + &self, + _peer: TransportPeer, + _transport: TransportUnicast, + ) -> ZResult> { + Ok(Arc::new(DummyTransportPeerEventHandler)) + } - // Create the transport transport manager for the third client spoofing the first - let unicast = TransportManager::config_unicast() - .max_links(2) - .max_sessions(1); - let client03_manager = TransportManager::builder() - .whatami(WhatAmI::Client) - .zid(client01_id) - .unicast(unicast) - .build(Arc::new(SHClientOpenClose::new())) - .unwrap(); + fn new_multicast( + &self, + _transport: TransportMulticast, + ) -> ZResult> { + panic!(); + } +} - /* [1] */ - println!("\nTransport Open Close [1a1]"); - // Add the locator on the router - let res = ztimeout!(router_manager.add_listener(endpoint.clone())); - println!("Transport Open Close [1a1]: {res:?}"); - assert!(res.is_ok()); - println!("Transport Open Close [1a2]"); - let locators = router_manager.get_listeners().await; - println!("Transport Open Close [1a2]: {locators:?}"); - assert_eq!(locators.len(), 1); - - // Open a first transport from the client to the router - // -> This should be accepted - let mut links_num = 1; - - println!("Transport Open Close [1c1]"); - let res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); - println!("Transport Open Close [1c2]: {res:?}"); - assert!(res.is_ok()); - let c_ses1 = res.unwrap(); - println!("Transport Open Close [1d1]"); - let transports = client01_manager.get_transports_unicast().await; - println!("Transport Open Close [1d2]: {transports:?}"); - assert_eq!(transports.len(), 1); - assert_eq!(c_ses1.get_zid().unwrap(), router_id); - println!("Transport Open Close [1e1]"); - let links = c_ses1.get_links().unwrap(); - println!("Transport Open Close [1e2]: {links:?}"); - assert_eq!(links.len(), links_num); +async fn multilink_transport(endpoint: &EndPoint) { + /* [ROUTER] */ + let router_id = ZenohId::try_from([1]).unwrap(); + + let router_handler = Arc::new(SHRouterOpenClose); + // Create the router transport manager + let unicast = TransportManager::config_unicast() + .max_links(2) + .max_sessions(2); + let router_manager = TransportManager::builder() + .whatami(WhatAmI::Router) + .zid(router_id) + .unicast(unicast) + .build(router_handler.clone()) + .unwrap(); + + /* [CLIENT] */ + let client01_id = ZenohId::try_from([2]).unwrap(); + let client02_id = ZenohId::try_from([3]).unwrap(); + + // Create the transport transport manager for the first client + let unicast = TransportManager::config_unicast() + .max_links(2) + .max_sessions(1); + let client01_manager = TransportManager::builder() + .whatami(WhatAmI::Client) + .zid(client01_id) + .unicast(unicast) + .build(Arc::new(SHClientOpenClose::new())) + .unwrap(); + + // Create the transport transport manager for the second client + let unicast = TransportManager::config_unicast() + .max_links(1) + .max_sessions(1); + let client02_manager = TransportManager::builder() + .whatami(WhatAmI::Client) + .zid(client02_id) + .unicast(unicast) + .build(Arc::new(SHClientOpenClose::new())) + .unwrap(); + + // Create the transport transport manager for the third client spoofing the first + let unicast = TransportManager::config_unicast() + .max_links(2) + .max_sessions(1); + let client03_manager = TransportManager::builder() + .whatami(WhatAmI::Client) + .zid(client01_id) + .unicast(unicast) + .build(Arc::new(SHClientOpenClose::new())) + .unwrap(); + + /* [1] */ + println!("\nTransport Open Close [1a1]"); + // Add the locator on the router + let res = ztimeout!(router_manager.add_listener(endpoint.clone())); + println!("Transport Open Close [1a1]: {res:?}"); + assert!(res.is_ok()); + println!("Transport Open Close [1a2]"); + let locators = router_manager.get_listeners().await; + println!("Transport Open Close [1a2]: {locators:?}"); + assert_eq!(locators.len(), 1); + + // Open a first transport from the client to the router + // -> This should be accepted + let mut links_num = 1; + + println!("Transport Open Close [1c1]"); + let res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); + println!("Transport Open Close [1c2]: {res:?}"); + assert!(res.is_ok()); + let c_ses1 = res.unwrap(); + println!("Transport Open Close [1d1]"); + let transports = client01_manager.get_transports_unicast().await; + println!("Transport Open Close [1d2]: {transports:?}"); + assert_eq!(transports.len(), 1); + assert_eq!(c_ses1.get_zid().unwrap(), router_id); + println!("Transport Open Close [1e1]"); + let links = c_ses1.get_links().unwrap(); + println!("Transport Open Close [1e2]: {links:?}"); + assert_eq!(links.len(), links_num); + + // Verify that the transport has been open on the router + println!("Transport Open Close [1f1]"); + ztimeout!(async { + loop { + let transports = router_manager.get_transports_unicast().await; + let s = transports + .iter() + .find(|s| s.get_zid().unwrap() == client01_id); - // Verify that the transport has been open on the router - println!("Transport Open Close [1f1]"); - ztimeout!(async { - loop { - let transports = router_manager.get_transports_unicast().await; - let s = transports - .iter() - .find(|s| s.get_zid().unwrap() == client01_id); - - match s { - Some(s) => { - let links = s.get_links().unwrap(); - assert_eq!(links.len(), links_num); - break; - } - None => tokio::time::sleep(SLEEP).await, - } - } - }); - - /* [2] */ - // Open a second transport from the client to the router - // -> This should be accepted - links_num = 2; - - println!("\nTransport Open Close [2a1]"); - let res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); - println!("Transport Open Close [2a2]: {res:?}"); - assert!(res.is_ok()); - let c_ses2 = res.unwrap(); - println!("Transport Open Close [2b1]"); - let transports = client01_manager.get_transports_unicast().await; - println!("Transport Open Close [2b2]: {transports:?}"); - assert_eq!(transports.len(), 1); - assert_eq!(c_ses2.get_zid().unwrap(), router_id); - println!("Transport Open Close [2c1]"); - let links = c_ses2.get_links().unwrap(); - println!("Transport Open Close [2c2]: {links:?}"); - assert_eq!(links.len(), links_num); - assert_eq!(c_ses2, c_ses1); - - // Verify that the transport has been open on the router - println!("Transport Open Close [2d1]"); - ztimeout!(async { - loop { - let transports = router_manager.get_transports_unicast().await; - let s = transports - .iter() - .find(|s| s.get_zid().unwrap() == client01_id) - .unwrap(); - - let links = s.get_links().unwrap(); - if links.len() == links_num { + match s { + Some(s) => { + let links = s.get_links().unwrap(); + assert_eq!(links.len(), links_num); break; } - tokio::time::sleep(SLEEP).await; + None => tokio::time::sleep(SLEEP).await, } - }); - - /* [3] */ - // Open transport -> This should be rejected because - // of the maximum limit of links per transport - println!("\nTransport Open Close [3a1]"); - let res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); - println!("Transport Open Close [3a2]: {res:?}"); - assert!(res.is_err()); - println!("Transport Open Close [3b1]"); - let transports = client01_manager.get_transports_unicast().await; - println!("Transport Open Close [3b2]: {transports:?}"); - assert_eq!(transports.len(), 1); - assert_eq!(c_ses1.get_zid().unwrap(), router_id); - println!("Transport Open Close [3c1]"); - let links = c_ses1.get_links().unwrap(); - println!("Transport Open Close [3c2]: {links:?}"); - assert_eq!(links.len(), links_num); - - // Verify that the transport has not been open on the router - println!("Transport Open Close [3d1]"); - ztimeout!(async { - tokio::time::sleep(SLEEP).await; + } + }); + + /* [2] */ + // Open a second transport from the client to the router + // -> This should be accepted + links_num = 2; + + println!("\nTransport Open Close [2a1]"); + let res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); + println!("Transport Open Close [2a2]: {res:?}"); + assert!(res.is_ok()); + let c_ses2 = res.unwrap(); + println!("Transport Open Close [2b1]"); + let transports = client01_manager.get_transports_unicast().await; + println!("Transport Open Close [2b2]: {transports:?}"); + assert_eq!(transports.len(), 1); + assert_eq!(c_ses2.get_zid().unwrap(), router_id); + println!("Transport Open Close [2c1]"); + let links = c_ses2.get_links().unwrap(); + println!("Transport Open Close [2c2]: {links:?}"); + assert_eq!(links.len(), links_num); + assert_eq!(c_ses2, c_ses1); + + // Verify that the transport has been open on the router + println!("Transport Open Close [2d1]"); + ztimeout!(async { + loop { let transports = router_manager.get_transports_unicast().await; - assert_eq!(transports.len(), 1); let s = transports .iter() .find(|s| s.get_zid().unwrap() == client01_id) .unwrap(); + let links = s.get_links().unwrap(); - assert_eq!(links.len(), links_num); - }); - - /* [4] */ - // Close the open transport on the client - println!("\nTransport Open Close [4a1]"); - let res = ztimeout!(c_ses1.close()); - println!("Transport Open Close [4a2]: {res:?}"); - assert!(res.is_ok()); - println!("Transport Open Close [4b1]"); - let transports = client01_manager.get_transports_unicast().await; - println!("Transport Open Close [4b2]: {transports:?}"); - assert_eq!(transports.len(), 0); - - // Verify that the transport has been closed also on the router - println!("Transport Open Close [4c1]"); - ztimeout!(async { - loop { - let transports = router_manager.get_transports_unicast().await; - let index = transports - .iter() - .find(|s| s.get_zid().unwrap() == client01_id); - if index.is_none() { - break; - } - tokio::time::sleep(SLEEP).await; + if links.len() == links_num { + break; } - }); - - /* [5] */ - // Open transport -> This should be accepted because - // the number of links should be back to 0 - links_num = 1; - - println!("\nTransport Open Close [5a1]"); - let res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); - println!("Transport Open Close [5a2]: {res:?}"); - assert!(res.is_ok()); - let c_ses3 = res.unwrap(); - println!("Transport Open Close [5b1]"); - let transports = client01_manager.get_transports_unicast().await; - println!("Transport Open Close [5b2]: {transports:?}"); + tokio::time::sleep(SLEEP).await; + } + }); + + /* [3] */ + // Open transport -> This should be rejected because + // of the maximum limit of links per transport + println!("\nTransport Open Close [3a1]"); + let res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); + println!("Transport Open Close [3a2]: {res:?}"); + assert!(res.is_err()); + println!("Transport Open Close [3b1]"); + let transports = client01_manager.get_transports_unicast().await; + println!("Transport Open Close [3b2]: {transports:?}"); + assert_eq!(transports.len(), 1); + assert_eq!(c_ses1.get_zid().unwrap(), router_id); + println!("Transport Open Close [3c1]"); + let links = c_ses1.get_links().unwrap(); + println!("Transport Open Close [3c2]: {links:?}"); + assert_eq!(links.len(), links_num); + + // Verify that the transport has not been open on the router + println!("Transport Open Close [3d1]"); + ztimeout!(async { + tokio::time::sleep(SLEEP).await; + let transports = router_manager.get_transports_unicast().await; assert_eq!(transports.len(), 1); - assert_eq!(c_ses3.get_zid().unwrap(), router_id); - println!("Transport Open Close [5c1]"); - let links = c_ses3.get_links().unwrap(); - println!("Transport Open Close [5c2]: {links:?}"); + let s = transports + .iter() + .find(|s| s.get_zid().unwrap() == client01_id) + .unwrap(); + let links = s.get_links().unwrap(); assert_eq!(links.len(), links_num); - - // Verify that the transport has been open on the router - println!("Transport Open Close [5d1]"); - ztimeout!(async { - tokio::time::sleep(SLEEP).await; + }); + + /* [4] */ + // Close the open transport on the client + println!("\nTransport Open Close [4a1]"); + let res = ztimeout!(c_ses1.close()); + println!("Transport Open Close [4a2]: {res:?}"); + assert!(res.is_ok()); + println!("Transport Open Close [4b1]"); + let transports = client01_manager.get_transports_unicast().await; + println!("Transport Open Close [4b2]: {transports:?}"); + assert_eq!(transports.len(), 0); + + // Verify that the transport has been closed also on the router + println!("Transport Open Close [4c1]"); + ztimeout!(async { + loop { let transports = router_manager.get_transports_unicast().await; - assert_eq!(transports.len(), 1); - let s = transports + let index = transports .iter() - .find(|s| s.get_zid().unwrap() == client01_id) - .unwrap(); - let links = s.get_links().unwrap(); - assert_eq!(links.len(), links_num); - }); - - /* [6] */ - // Open transport -> This should be rejected because - // of the maximum limit of transports - println!("\nTransport Open Close [6a1]"); - let res = ztimeout!(client02_manager.open_transport_unicast(endpoint.clone())); - println!("Transport Open Close [6a2]: {res:?}"); - assert!(res.is_ok()); - let c_ses4 = res.unwrap(); - println!("Transport Open Close [6b1]"); - let transports = client02_manager.get_transports_unicast().await; - println!("Transport Open Close [6b2]: {transports:?}"); + .find(|s| s.get_zid().unwrap() == client01_id); + if index.is_none() { + break; + } + tokio::time::sleep(SLEEP).await; + } + }); + + /* [5] */ + // Open transport -> This should be accepted because + // the number of links should be back to 0 + links_num = 1; + + println!("\nTransport Open Close [5a1]"); + let res = ztimeout!(client01_manager.open_transport_unicast(endpoint.clone())); + println!("Transport Open Close [5a2]: {res:?}"); + assert!(res.is_ok()); + let c_ses3 = res.unwrap(); + println!("Transport Open Close [5b1]"); + let transports = client01_manager.get_transports_unicast().await; + println!("Transport Open Close [5b2]: {transports:?}"); + assert_eq!(transports.len(), 1); + assert_eq!(c_ses3.get_zid().unwrap(), router_id); + println!("Transport Open Close [5c1]"); + let links = c_ses3.get_links().unwrap(); + println!("Transport Open Close [5c2]: {links:?}"); + assert_eq!(links.len(), links_num); + + // Verify that the transport has been open on the router + println!("Transport Open Close [5d1]"); + ztimeout!(async { + tokio::time::sleep(SLEEP).await; + let transports = router_manager.get_transports_unicast().await; assert_eq!(transports.len(), 1); - assert_eq!(c_ses4.get_zid().unwrap(), router_id); - println!("Transport Open Close [6c1]"); - let links = c_ses4.get_links().unwrap(); - println!("Transport Open Close [6c2]: {links:?}"); + let s = transports + .iter() + .find(|s| s.get_zid().unwrap() == client01_id) + .unwrap(); + let links = s.get_links().unwrap(); assert_eq!(links.len(), links_num); - - // Open transport -> This should be rejected because - // of the maximum limit of transports - println!("\nTransport Open Close [6d1]"); - let res = ztimeout!(client02_manager.open_transport_unicast(endpoint.clone())); - println!("Transport Open Close [6d2]: {res:?}"); - assert!(res.is_err()); - println!("Transport Open Close [6e1]"); - let transports = client02_manager.get_transports_unicast().await; - println!("Transport Open Close [6e2]: {transports:?}"); - assert_eq!(transports.len(), 1); - - // Verify that the transport has been open on the router - println!("Transport Open Close [6f1]"); - ztimeout!(async { + }); + + /* [6] */ + // Open transport -> This should be rejected because + // of the maximum limit of transports + println!("\nTransport Open Close [6a1]"); + let res = ztimeout!(client02_manager.open_transport_unicast(endpoint.clone())); + println!("Transport Open Close [6a2]: {res:?}"); + assert!(res.is_ok()); + let c_ses4 = res.unwrap(); + println!("Transport Open Close [6b1]"); + let transports = client02_manager.get_transports_unicast().await; + println!("Transport Open Close [6b2]: {transports:?}"); + assert_eq!(transports.len(), 1); + assert_eq!(c_ses4.get_zid().unwrap(), router_id); + println!("Transport Open Close [6c1]"); + let links = c_ses4.get_links().unwrap(); + println!("Transport Open Close [6c2]: {links:?}"); + assert_eq!(links.len(), links_num); + + // Open transport -> This should be rejected because + // of the maximum limit of transports + println!("\nTransport Open Close [6d1]"); + let res = ztimeout!(client02_manager.open_transport_unicast(endpoint.clone())); + println!("Transport Open Close [6d2]: {res:?}"); + assert!(res.is_err()); + println!("Transport Open Close [6e1]"); + let transports = client02_manager.get_transports_unicast().await; + println!("Transport Open Close [6e2]: {transports:?}"); + assert_eq!(transports.len(), 1); + + // Verify that the transport has been open on the router + println!("Transport Open Close [6f1]"); + ztimeout!(async { + tokio::time::sleep(SLEEP).await; + let transports = router_manager.get_transports_unicast().await; + assert_eq!(transports.len(), 2); + let s = transports + .iter() + .find(|s| s.get_zid().unwrap() == client01_id) + .unwrap(); + let links = s.get_links().unwrap(); + assert_eq!(links.len(), links_num); + }); + + /* [7] */ + // Try to spoof the first client + // -> This should be rejected + println!("\nTransport Open Close [7a1]"); + let res = ztimeout!(client03_manager.open_transport_unicast(endpoint.clone())); + println!("Transport Open Close [7a2]: {res:?}"); + assert!(res.is_err()); + println!("Transport Open Close [7b1]"); + let transports = client03_manager.get_transports_unicast().await; + println!("Transport Open Close [7b2]: {transports:?}"); + assert_eq!(transports.len(), 0); + + /* [8] */ + // Close the open transport on the client + println!("\nTransport Open Close [8a1]"); + let res = ztimeout!(c_ses3.close()); + println!("Transport Open Close [8a2]: {res:?}"); + assert!(res.is_ok()); + println!("\nTransport Open Close [8b1]"); + let res = ztimeout!(c_ses4.close()); + println!("Transport Open Close [8b2]: {res:?}"); + assert!(res.is_ok()); + println!("Transport Open Close [8c1]"); + let transports = client01_manager.get_transports_unicast().await; + println!("Transport Open Close [8c2]: {transports:?}"); + assert_eq!(transports.len(), 0); + + // Verify that the transport has been closed also on the router + println!("Transport Open Close [8d1]"); + ztimeout!(async { + loop { + let transports = router_manager.get_transports_unicast().await; + if transports.is_empty() { + break; + } tokio::time::sleep(SLEEP).await; + } + }); + + /* [9] */ + // Open transport -> This should be accepted because + // the number of transports should be back to 0 + links_num = 1; + + println!("\nTransport Open Close [9a1]"); + let res = ztimeout!(client02_manager.open_transport_unicast(endpoint.clone())); + println!("Transport Open Close [9a2]: {res:?}"); + assert!(res.is_ok()); + let c_ses4 = res.unwrap(); + println!("Transport Open Close [9b1]"); + let transports = client02_manager.get_transports_unicast().await; + println!("Transport Open Close [9b2]: {transports:?}"); + assert_eq!(transports.len(), 1); + println!("Transport Open Close [9c1]"); + let links = c_ses4.get_links().unwrap(); + println!("Transport Open Close [9c2]: {links:?}"); + assert_eq!(links.len(), links_num); + + // Verify that the transport has been open on the router + println!("Transport Open Close [9d1]"); + ztimeout!(async { + loop { let transports = router_manager.get_transports_unicast().await; - assert_eq!(transports.len(), 2); let s = transports .iter() - .find(|s| s.get_zid().unwrap() == client01_id) - .unwrap(); - let links = s.get_links().unwrap(); - assert_eq!(links.len(), links_num); - }); - - /* [7] */ - // Try to spoof the first client - // -> This should be rejected - println!("\nTransport Open Close [7a1]"); - let res = ztimeout!(client03_manager.open_transport_unicast(endpoint.clone())); - println!("Transport Open Close [7a2]: {res:?}"); - assert!(res.is_err()); - println!("Transport Open Close [7b1]"); - let transports = client03_manager.get_transports_unicast().await; - println!("Transport Open Close [7b2]: {transports:?}"); - assert_eq!(transports.len(), 0); - - /* [8] */ - // Close the open transport on the client - println!("\nTransport Open Close [8a1]"); - let res = ztimeout!(c_ses3.close()); - println!("Transport Open Close [8a2]: {res:?}"); - assert!(res.is_ok()); - println!("\nTransport Open Close [8b1]"); - let res = ztimeout!(c_ses4.close()); - println!("Transport Open Close [8b2]: {res:?}"); - assert!(res.is_ok()); - println!("Transport Open Close [8c1]"); - let transports = client01_manager.get_transports_unicast().await; - println!("Transport Open Close [8c2]: {transports:?}"); - assert_eq!(transports.len(), 0); - - // Verify that the transport has been closed also on the router - println!("Transport Open Close [8d1]"); - ztimeout!(async { - loop { - let transports = router_manager.get_transports_unicast().await; - if transports.is_empty() { - break; - } - tokio::time::sleep(SLEEP).await; - } - }); - - /* [9] */ - // Open transport -> This should be accepted because - // the number of transports should be back to 0 - links_num = 1; - - println!("\nTransport Open Close [9a1]"); - let res = ztimeout!(client02_manager.open_transport_unicast(endpoint.clone())); - println!("Transport Open Close [9a2]: {res:?}"); - assert!(res.is_ok()); - let c_ses4 = res.unwrap(); - println!("Transport Open Close [9b1]"); - let transports = client02_manager.get_transports_unicast().await; - println!("Transport Open Close [9b2]: {transports:?}"); - assert_eq!(transports.len(), 1); - println!("Transport Open Close [9c1]"); - let links = c_ses4.get_links().unwrap(); - println!("Transport Open Close [9c2]: {links:?}"); - assert_eq!(links.len(), links_num); - - // Verify that the transport has been open on the router - println!("Transport Open Close [9d1]"); - ztimeout!(async { - loop { - let transports = router_manager.get_transports_unicast().await; - let s = transports - .iter() - .find(|s| s.get_zid().unwrap() == client02_id); - match s { - Some(s) => { - let links = s.get_links().unwrap(); - assert_eq!(links.len(), links_num); - break; - } - None => tokio::time::sleep(SLEEP).await, - } - } - }); - - /* [9] */ - // Close the open transport on the client - println!("Transport Open Close [9a1]"); - let res = ztimeout!(c_ses4.close()); - println!("Transport Open Close [9a2]: {res:?}"); - assert!(res.is_ok()); - println!("Transport Open Close [9b1]"); - let transports = client02_manager.get_transports_unicast().await; - println!("Transport Open Close [9b2]: {transports:?}"); - assert_eq!(transports.len(), 0); - - // Verify that the transport has been closed also on the router - println!("Transport Open Close [9c1]"); - ztimeout!(async { - loop { - let transports = router_manager.get_transports_unicast().await; - if transports.is_empty() { + .find(|s| s.get_zid().unwrap() == client02_id); + match s { + Some(s) => { + let links = s.get_links().unwrap(); + assert_eq!(links.len(), links_num); break; } - tokio::time::sleep(SLEEP).await; + None => tokio::time::sleep(SLEEP).await, } - }); - - /* [10] */ - // Perform clean up of the open locators - println!("\nTransport Open Close [10a1]"); - let res = ztimeout!(router_manager.del_listener(endpoint)); - println!("Transport Open Close [10a2]: {res:?}"); - assert!(res.is_ok()); - - ztimeout!(async { - while !router_manager.get_listeners().await.is_empty() { - tokio::time::sleep(SLEEP).await; + } + }); + + /* [9] */ + // Close the open transport on the client + println!("Transport Open Close [9a1]"); + let res = ztimeout!(c_ses4.close()); + println!("Transport Open Close [9a2]: {res:?}"); + assert!(res.is_ok()); + println!("Transport Open Close [9b1]"); + let transports = client02_manager.get_transports_unicast().await; + println!("Transport Open Close [9b2]: {transports:?}"); + assert_eq!(transports.len(), 0); + + // Verify that the transport has been closed also on the router + println!("Transport Open Close [9c1]"); + ztimeout!(async { + loop { + let transports = router_manager.get_transports_unicast().await; + if transports.is_empty() { + break; } - }); + tokio::time::sleep(SLEEP).await; + } + }); - // Wait a little bit - tokio::time::sleep(SLEEP).await; + /* [10] */ + // Perform clean up of the open locators + println!("\nTransport Open Close [10a1]"); + let res = ztimeout!(router_manager.del_listener(endpoint)); + println!("Transport Open Close [10a2]: {res:?}"); + assert!(res.is_ok()); - ztimeout!(router_manager.close()); - ztimeout!(client01_manager.close()); - ztimeout!(client02_manager.close()); + ztimeout!(async { + while !router_manager.get_listeners().await.is_empty() { + tokio::time::sleep(SLEEP).await; + } + }); - // Wait a little bit - tokio::time::sleep(SLEEP).await; - } + // Wait a little bit + tokio::time::sleep(SLEEP).await; - #[cfg(feature = "transport_tcp")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn multilink_tcp_only() { - zenoh_util::try_init_log_from_env(); + ztimeout!(router_manager.close()); + ztimeout!(client01_manager.close()); + ztimeout!(client02_manager.close()); - let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 18000).parse().unwrap(); - multilink_transport(&endpoint).await; - } + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} - #[cfg(feature = "transport_udp")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn multilink_udp_only() { - zenoh_util::try_init_log_from_env(); +#[cfg(feature = "transport_tcp")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn multilink_tcp_only() { + zenoh_util::try_init_log_from_env(); - let endpoint: EndPoint = format!("udp/127.0.0.1:{}", 18010).parse().unwrap(); - multilink_transport(&endpoint).await; - } + let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 18000).parse().unwrap(); + multilink_transport(&endpoint).await; +} - #[cfg(feature = "transport_ws")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - #[ignore] - async fn multilink_ws_only() { - zenoh_util::try_init_log_from_env(); +#[cfg(feature = "transport_udp")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn multilink_udp_only() { + zenoh_util::try_init_log_from_env(); - let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 18020).parse().unwrap(); - multilink_transport(&endpoint).await; - } + let endpoint: EndPoint = format!("udp/127.0.0.1:{}", 18010).parse().unwrap(); + multilink_transport(&endpoint).await; +} - #[cfg(feature = "transport_unixpipe")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - #[ignore] - async fn multilink_unixpipe_only() { - zenoh_util::try_init_log_from_env(); +#[cfg(feature = "transport_ws")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[ignore] +async fn multilink_ws_only() { + zenoh_util::try_init_log_from_env(); - let endpoint: EndPoint = "unixpipe/multilink_unixpipe_only".parse().unwrap(); - multilink_transport(&endpoint).await; - } + let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 18020).parse().unwrap(); + multilink_transport(&endpoint).await; +} - #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - #[ignore] - async fn multilink_unix_only() { - zenoh_util::try_init_log_from_env(); - - let f1 = "zenoh-test-unix-socket-9.sock"; - let _ = std::fs::remove_file(f1); - let endpoint: EndPoint = format!("unixsock-stream/{f1}").parse().unwrap(); - multilink_transport(&endpoint).await; - let _ = std::fs::remove_file(f1); - let _ = std::fs::remove_file(format!("{f1}.lock")); - } +#[cfg(feature = "transport_unixpipe")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[ignore] +async fn multilink_unixpipe_only() { + zenoh_util::try_init_log_from_env(); - #[cfg(feature = "transport_tls")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn multilink_tls_only() { - use zenoh_link::tls::config::*; + let endpoint: EndPoint = "unixpipe/multilink_unixpipe_only".parse().unwrap(); + multilink_transport(&endpoint).await; +} + +#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[ignore] +async fn multilink_unix_only() { + zenoh_util::try_init_log_from_env(); + + let f1 = "zenoh-test-unix-socket-9.sock"; + let _ = std::fs::remove_file(f1); + let endpoint: EndPoint = format!("unixsock-stream/{f1}").parse().unwrap(); + multilink_transport(&endpoint).await; + let _ = std::fs::remove_file(f1); + let _ = std::fs::remove_file(format!("{f1}.lock")); +} - zenoh_util::try_init_log_from_env(); +#[cfg(feature = "transport_tls")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn multilink_tls_only() { + use zenoh_link::tls::config::*; - // NOTE: this an auto-generated pair of certificate and key. - // The target domain is localhost, so it has no real - // mapping to any existing domain. The certificate and key - // have been generated using: https://github.com/jsha/minica - let key = "-----BEGIN RSA PRIVATE KEY----- + zenoh_util::try_init_log_from_env(); + + // NOTE: this an auto-generated pair of certificate and key. + // The target domain is localhost, so it has no real + // mapping to any existing domain. The certificate and key + // have been generated using: https://github.com/jsha/minica + let key = "-----BEGIN RSA PRIVATE KEY----- MIIEpAIBAAKCAQEAsfqAuhElN4HnyeqLovSd4Qe+nNv5AwCjSO+HFiF30x3vQ1Hi qRA0UmyFlSqBnFH3TUHm4Jcad40QfrX8f11NKGZdpvKHsMYqYjZnYkRFGS2s4fQy aDbV5M06s3UDX8ETPgY41Y8fCKTSVdi9iHkwcVrXMxUu4IBBx0C1r2GSo3gkIBnU @@ -565,7 +565,7 @@ tYsqC2FtWzY51VOEKNpnfH7zH5n+bjoI9nAEAW63TK9ZKkr2hRGsDhJdGzmLfQ7v F6/CuIw9EsAq6qIB8O88FXQqald+BZOx6AzB8Oedsz/WtMmIEmr/+Q== -----END RSA PRIVATE KEY-----"; - let cert = "-----BEGIN CERTIFICATE----- + let cert = "-----BEGIN CERTIFICATE----- MIIDLjCCAhagAwIBAgIIeUtmIdFQznMwDQYJKoZIhvcNAQELBQAwIDEeMBwGA1UE AxMVbWluaWNhIHJvb3QgY2EgMDc4ZGE3MCAXDTIzMDMwNjE2MDMxOFoYDzIxMjMw MzA2MTYwMzE4WjAUMRIwEAYDVQQDEwlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEB @@ -586,8 +586,8 @@ p5e60QweRuJsb60aUaCG8HoICevXYK2fFqCQdlb5sIqQqXyN2K6HuKAFywsjsGyJ abY= -----END CERTIFICATE-----"; - // Configure the client - let ca = "-----BEGIN CERTIFICATE----- + // Configure the client + let ca = "-----BEGIN CERTIFICATE----- MIIDSzCCAjOgAwIBAgIIB42n1ZIkOakwDQYJKoZIhvcNAQELBQAwIDEeMBwGA1UE AxMVbWluaWNhIHJvb3QgY2EgMDc4ZGE3MCAXDTIzMDMwNjE2MDMwN1oYDzIxMjMw MzA2MTYwMzA3WjAgMR4wHAYDVQQDExVtaW5pY2Egcm9vdCBjYSAwNzhkYTcwggEi @@ -608,33 +608,33 @@ Ck0v2xSPAiVjg6w65rUQeW6uB5m0T2wyj+wm0At8vzhZPlgS1fKhcmT2dzOq3+oN R+IdLiXcyIkg0m9N8I17p0ljCSkbrgGMD3bbePRTfg== -----END CERTIFICATE-----"; - let mut endpoint: EndPoint = format!("tls/localhost:{}", 18030).parse().unwrap(); - endpoint - .config_mut() - .extend( - [ - (TLS_ROOT_CA_CERTIFICATE_RAW, ca), - (TLS_SERVER_PRIVATE_KEY_RAW, key), - (TLS_SERVER_CERTIFICATE_RAW, cert), - ] - .iter() - .map(|(k, v)| ((*k).to_owned(), (*v).to_owned())), - ) - .unwrap(); - - multilink_transport(&endpoint).await; - } + let mut endpoint: EndPoint = format!("tls/localhost:{}", 18030).parse().unwrap(); + endpoint + .config_mut() + .extend( + [ + (TLS_ROOT_CA_CERTIFICATE_RAW, ca), + (TLS_SERVER_PRIVATE_KEY_RAW, key), + (TLS_SERVER_CERTIFICATE_RAW, cert), + ] + .iter() + .map(|(k, v)| ((*k).to_owned(), (*v).to_owned())), + ) + .unwrap(); + + multilink_transport(&endpoint).await; +} - #[cfg(feature = "transport_quic")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn multilink_quic_only() { - use zenoh_link::quic::config::*; +#[cfg(feature = "transport_quic")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn multilink_quic_only() { + use zenoh_link::quic::config::*; - // NOTE: this an auto-generated pair of certificate and key. - // The target domain is localhost, so it has no real - // mapping to any existing domain. The certificate and key - // have been generated using: https://github.com/jsha/minica - let key = "-----BEGIN RSA PRIVATE KEY----- + // NOTE: this an auto-generated pair of certificate and key. + // The target domain is localhost, so it has no real + // mapping to any existing domain. The certificate and key + // have been generated using: https://github.com/jsha/minica + let key = "-----BEGIN RSA PRIVATE KEY----- MIIEpAIBAAKCAQEAsfqAuhElN4HnyeqLovSd4Qe+nNv5AwCjSO+HFiF30x3vQ1Hi qRA0UmyFlSqBnFH3TUHm4Jcad40QfrX8f11NKGZdpvKHsMYqYjZnYkRFGS2s4fQy aDbV5M06s3UDX8ETPgY41Y8fCKTSVdi9iHkwcVrXMxUu4IBBx0C1r2GSo3gkIBnU @@ -662,7 +662,7 @@ tYsqC2FtWzY51VOEKNpnfH7zH5n+bjoI9nAEAW63TK9ZKkr2hRGsDhJdGzmLfQ7v F6/CuIw9EsAq6qIB8O88FXQqald+BZOx6AzB8Oedsz/WtMmIEmr/+Q== -----END RSA PRIVATE KEY-----"; - let cert = "-----BEGIN CERTIFICATE----- + let cert = "-----BEGIN CERTIFICATE----- MIIDLjCCAhagAwIBAgIIeUtmIdFQznMwDQYJKoZIhvcNAQELBQAwIDEeMBwGA1UE AxMVbWluaWNhIHJvb3QgY2EgMDc4ZGE3MCAXDTIzMDMwNjE2MDMxOFoYDzIxMjMw MzA2MTYwMzE4WjAUMRIwEAYDVQQDEwlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEB @@ -683,8 +683,8 @@ p5e60QweRuJsb60aUaCG8HoICevXYK2fFqCQdlb5sIqQqXyN2K6HuKAFywsjsGyJ abY= -----END CERTIFICATE-----"; - // Configure the client - let ca = "-----BEGIN CERTIFICATE----- + // Configure the client + let ca = "-----BEGIN CERTIFICATE----- MIIDSzCCAjOgAwIBAgIIB42n1ZIkOakwDQYJKoZIhvcNAQELBQAwIDEeMBwGA1UE AxMVbWluaWNhIHJvb3QgY2EgMDc4ZGE3MCAXDTIzMDMwNjE2MDMwN1oYDzIxMjMw MzA2MTYwMzA3WjAgMR4wHAYDVQQDExVtaW5pY2Egcm9vdCBjYSAwNzhkYTcwggEi @@ -705,30 +705,29 @@ Ck0v2xSPAiVjg6w65rUQeW6uB5m0T2wyj+wm0At8vzhZPlgS1fKhcmT2dzOq3+oN R+IdLiXcyIkg0m9N8I17p0ljCSkbrgGMD3bbePRTfg== -----END CERTIFICATE-----"; - // Define the locator - let mut endpoint: EndPoint = format!("quic/localhost:{}", 18040).parse().unwrap(); - endpoint - .config_mut() - .extend( - [ - (TLS_ROOT_CA_CERTIFICATE_RAW, ca), - (TLS_SERVER_PRIVATE_KEY_RAW, key), - (TLS_SERVER_CERTIFICATE_RAW, cert), - ] - .iter() - .map(|(k, v)| ((*k).to_owned(), (*v).to_owned())), - ) - .unwrap(); - - multilink_transport(&endpoint).await; - } + // Define the locator + let mut endpoint: EndPoint = format!("quic/localhost:{}", 18040).parse().unwrap(); + endpoint + .config_mut() + .extend( + [ + (TLS_ROOT_CA_CERTIFICATE_RAW, ca), + (TLS_SERVER_PRIVATE_KEY_RAW, key), + (TLS_SERVER_CERTIFICATE_RAW, cert), + ] + .iter() + .map(|(k, v)| ((*k).to_owned(), (*v).to_owned())), + ) + .unwrap(); + + multilink_transport(&endpoint).await; +} - #[cfg(all(feature = "transport_vsock", target_os = "linux"))] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn multilink_vsock_only() { - zenoh_util::try_init_log_from_env(); +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn multilink_vsock_only() { + zenoh_util::try_init_log_from_env(); - let endpoint: EndPoint = "vsock/VMADDR_CID_LOCAL:17000".parse().unwrap(); - multilink_transport(&endpoint).await; - } + let endpoint: EndPoint = "vsock/VMADDR_CID_LOCAL:17000".parse().unwrap(); + multilink_transport(&endpoint).await; } diff --git a/io/zenoh-transport/tests/unicast_shm.rs b/io/zenoh-transport/tests/unicast_shm.rs index d04a54c63a..f193af1df2 100644 --- a/io/zenoh-transport/tests/unicast_shm.rs +++ b/io/zenoh-transport/tests/unicast_shm.rs @@ -11,417 +11,415 @@ // Contributors: // ZettaScale Zenoh Team, // -#[cfg(feature = "shared-memory")] -mod tests { - use rand::{Rng, SeedableRng}; - use std::{ - any::Any, - convert::TryFrom, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Duration, - }; - use zenoh_buffers::buffer::SplitBuffer; - use zenoh_core::ztimeout; - use zenoh_crypto::PseudoRng; - use zenoh_link::Link; - use zenoh_protocol::{ - core::{CongestionControl, Encoding, EndPoint, Priority, WhatAmI, ZenohId}, - network::{ - push::ext::{NodeIdType, QoSType}, - NetworkBody, NetworkMessage, Push, - }, - zenoh::{PushBody, Put}, - }; - use zenoh_result::{zerror, ZResult}; - use zenoh_shm::{SharedMemoryBuf, SharedMemoryManager}; - use zenoh_transport::{ - multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, - TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, - }; - - const TIMEOUT: Duration = Duration::from_secs(60); - const SLEEP: Duration = Duration::from_secs(1); - const USLEEP: Duration = Duration::from_micros(100); - - const MSG_COUNT: usize = 1_000; - const MSG_SIZE: usize = 1_024; - - // Transport Handler for the router - struct SHPeer { - count: Arc, - is_shm: bool, - } - - impl SHPeer { - fn new(is_shm: bool) -> Self { - Self { - count: Arc::new(AtomicUsize::new(0)), - is_shm, - } - } +#![cfg(feature = "shared-memory")] + +use rand::{Rng, SeedableRng}; +use std::{ + any::Any, + convert::TryFrom, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; +use zenoh_buffers::buffer::SplitBuffer; +use zenoh_core::ztimeout; +use zenoh_crypto::PseudoRng; +use zenoh_link::Link; +use zenoh_protocol::{ + core::{CongestionControl, Encoding, EndPoint, Priority, WhatAmI, ZenohId}, + network::{ + push::ext::{NodeIdType, QoSType}, + NetworkBody, NetworkMessage, Push, + }, + zenoh::{PushBody, Put}, +}; +use zenoh_result::{zerror, ZResult}; +use zenoh_shm::{SharedMemoryBuf, SharedMemoryManager}; +use zenoh_transport::{ + multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, + TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, +}; + +const TIMEOUT: Duration = Duration::from_secs(60); +const SLEEP: Duration = Duration::from_secs(1); +const USLEEP: Duration = Duration::from_micros(100); + +const MSG_COUNT: usize = 1_000; +const MSG_SIZE: usize = 1_024; + +// Transport Handler for the router +struct SHPeer { + count: Arc, + is_shm: bool, +} - fn get_count(&self) -> usize { - self.count.load(Ordering::SeqCst) +impl SHPeer { + fn new(is_shm: bool) -> Self { + Self { + count: Arc::new(AtomicUsize::new(0)), + is_shm, } } - impl TransportEventHandler for SHPeer { - fn new_unicast( - &self, - _peer: TransportPeer, - _transport: TransportUnicast, - ) -> ZResult> { - let arc = Arc::new(SCPeer::new(self.count.clone(), self.is_shm)); - Ok(arc) - } + fn get_count(&self) -> usize { + self.count.load(Ordering::SeqCst) + } +} - fn new_multicast( - &self, - _transport: TransportMulticast, - ) -> ZResult> { - panic!(); - } +impl TransportEventHandler for SHPeer { + fn new_unicast( + &self, + _peer: TransportPeer, + _transport: TransportUnicast, + ) -> ZResult> { + let arc = Arc::new(SCPeer::new(self.count.clone(), self.is_shm)); + Ok(arc) } - // Transport Callback for the peer - pub struct SCPeer { - count: Arc, - is_shm: bool, + fn new_multicast( + &self, + _transport: TransportMulticast, + ) -> ZResult> { + panic!(); } +} - impl SCPeer { - pub fn new(count: Arc, is_shm: bool) -> Self { - Self { count, is_shm } - } +// Transport Callback for the peer +pub struct SCPeer { + count: Arc, + is_shm: bool, +} + +impl SCPeer { + pub fn new(count: Arc, is_shm: bool) -> Self { + Self { count, is_shm } } +} - impl TransportPeerEventHandler for SCPeer { - fn handle_message(&self, message: NetworkMessage) -> ZResult<()> { - if self.is_shm { - print!("s"); - } else { - print!("n"); - } - let payload = match message.body { - NetworkBody::Push(m) => match m.payload { - PushBody::Put(Put { payload, .. }) => { - for zs in payload.zslices() { - if self.is_shm && zs.downcast_ref::().is_none() { - panic!("Expected SharedMemoryBuf: {:?}", zs); - } else if !self.is_shm && zs.downcast_ref::().is_some() - { - panic!("Not Expected SharedMemoryBuf: {:?}", zs); - } +impl TransportPeerEventHandler for SCPeer { + fn handle_message(&self, message: NetworkMessage) -> ZResult<()> { + if self.is_shm { + print!("s"); + } else { + print!("n"); + } + let payload = match message.body { + NetworkBody::Push(m) => match m.payload { + PushBody::Put(Put { payload, .. }) => { + for zs in payload.zslices() { + if self.is_shm && zs.downcast_ref::().is_none() { + panic!("Expected SharedMemoryBuf: {:?}", zs); + } else if !self.is_shm && zs.downcast_ref::().is_some() { + panic!("Not Expected SharedMemoryBuf: {:?}", zs); } - payload.contiguous().into_owned() } - _ => panic!("Unsolicited message"), - }, + payload.contiguous().into_owned() + } _ => panic!("Unsolicited message"), - }; - assert_eq!(payload.len(), MSG_SIZE); - - let mut count_bytes = [0_u8; 8]; - count_bytes.copy_from_slice(&payload[0..8]); - let msg_count = u64::from_le_bytes(count_bytes) as usize; - let sex_count = self.count.fetch_add(1, Ordering::SeqCst); - assert_eq!(msg_count, sex_count); - print!("{msg_count} "); + }, + _ => panic!("Unsolicited message"), + }; + assert_eq!(payload.len(), MSG_SIZE); + + let mut count_bytes = [0_u8; 8]; + count_bytes.copy_from_slice(&payload[0..8]); + let msg_count = u64::from_le_bytes(count_bytes) as usize; + let sex_count = self.count.fetch_add(1, Ordering::SeqCst); + assert_eq!(msg_count, sex_count); + print!("{msg_count} "); + + Ok(()) + } - Ok(()) - } + fn new_link(&self, _link: Link) {} + fn del_link(&self, _link: Link) {} + fn closing(&self) {} + fn closed(&self) {} - fn new_link(&self, _link: Link) {} - fn del_link(&self, _link: Link) {} - fn closing(&self) {} - fn closed(&self) {} + fn as_any(&self) -> &dyn Any { + self + } +} - fn as_any(&self) -> &dyn Any { - self +async fn run(endpoint: &EndPoint, lowlatency_transport: bool) { + println!("Transport SHM [0a]: {endpoint:?}"); + + // Define client and router IDs + let peer_shm01 = ZenohId::try_from([1]).unwrap(); + let peer_shm02 = ZenohId::try_from([2]).unwrap(); + let peer_net01 = ZenohId::try_from([3]).unwrap(); + + let mut tries = 100; + let mut prng = PseudoRng::from_entropy(); + let mut shm01 = loop { + // Create the SharedMemoryManager + if let Ok(shm01) = SharedMemoryManager::make( + format!("peer_shm01_{}_{}", endpoint.protocol(), prng.gen::()), + 2 * MSG_SIZE, + ) { + break Ok(shm01); + } + tries -= 1; + if tries == 0 { + break Err(zerror!("Unable to create SharedMemoryManager!")); } } + .unwrap(); + + // Create a peer manager with shared-memory authenticator enabled + let peer_shm01_handler = Arc::new(SHPeer::new(true)); + let peer_shm01_manager = TransportManager::builder() + .whatami(WhatAmI::Peer) + .zid(peer_shm01) + .unicast( + TransportManager::config_unicast() + .shm(true) + .lowlatency(lowlatency_transport) + .qos(!lowlatency_transport), + ) + .build(peer_shm01_handler.clone()) + .unwrap(); - async fn run(endpoint: &EndPoint, lowlatency_transport: bool) { - println!("Transport SHM [0a]: {endpoint:?}"); - - // Define client and router IDs - let peer_shm01 = ZenohId::try_from([1]).unwrap(); - let peer_shm02 = ZenohId::try_from([2]).unwrap(); - let peer_net01 = ZenohId::try_from([3]).unwrap(); - - let mut tries = 100; - let mut prng = PseudoRng::from_entropy(); - let mut shm01 = loop { - // Create the SharedMemoryManager - if let Ok(shm01) = SharedMemoryManager::make( - format!("peer_shm01_{}_{}", endpoint.protocol(), prng.gen::()), - 2 * MSG_SIZE, - ) { - break Ok(shm01); - } - tries -= 1; - if tries == 0 { - break Err(zerror!("Unable to create SharedMemoryManager!")); - } - } + // Create a peer manager with shared-memory authenticator enabled + let peer_shm02_handler = Arc::new(SHPeer::new(true)); + let peer_shm02_manager = TransportManager::builder() + .whatami(WhatAmI::Peer) + .zid(peer_shm02) + .unicast( + TransportManager::config_unicast() + .shm(true) + .lowlatency(lowlatency_transport) + .qos(!lowlatency_transport), + ) + .build(peer_shm02_handler.clone()) .unwrap(); - // Create a peer manager with shared-memory authenticator enabled - let peer_shm01_handler = Arc::new(SHPeer::new(true)); - let peer_shm01_manager = TransportManager::builder() - .whatami(WhatAmI::Peer) - .zid(peer_shm01) - .unicast( - TransportManager::config_unicast() - .shm(true) - .lowlatency(lowlatency_transport) - .qos(!lowlatency_transport), - ) - .build(peer_shm01_handler.clone()) - .unwrap(); - - // Create a peer manager with shared-memory authenticator enabled - let peer_shm02_handler = Arc::new(SHPeer::new(true)); - let peer_shm02_manager = TransportManager::builder() - .whatami(WhatAmI::Peer) - .zid(peer_shm02) - .unicast( - TransportManager::config_unicast() - .shm(true) - .lowlatency(lowlatency_transport) - .qos(!lowlatency_transport), - ) - .build(peer_shm02_handler.clone()) - .unwrap(); - - // Create a peer manager with shared-memory authenticator disabled - let peer_net01_handler = Arc::new(SHPeer::new(false)); - let peer_net01_manager = TransportManager::builder() - .whatami(WhatAmI::Peer) - .zid(peer_net01) - .unicast( - TransportManager::config_unicast() - .shm(false) - .lowlatency(lowlatency_transport) - .qos(!lowlatency_transport), - ) - .build(peer_net01_handler.clone()) - .unwrap(); - - // Create the listener on the peer - println!("Transport SHM [1a]"); - let _ = ztimeout!(peer_shm01_manager.add_listener(endpoint.clone())).unwrap(); - - // Create a transport with the peer - println!("Transport SHM [1b]"); - let peer_shm01_transport = - ztimeout!(peer_shm02_manager.open_transport_unicast(endpoint.clone())).unwrap(); - assert!(peer_shm01_transport.is_shm().unwrap()); - - // Create a transport with the peer - println!("Transport SHM [1c]"); - let peer_net02_transport = - ztimeout!(peer_net01_manager.open_transport_unicast(endpoint.clone())).unwrap(); - assert!(!peer_net02_transport.is_shm().unwrap()); - - // Retrieve the transports - println!("Transport SHM [2a]"); - let peer_shm02_transport = peer_shm01_manager - .get_transport_unicast(&peer_shm02) - .await - .unwrap(); - assert!(peer_shm02_transport.is_shm().unwrap()); - - println!("Transport SHM [2b]"); - let peer_net01_transport = peer_shm01_manager - .get_transport_unicast(&peer_net01) - .await - .unwrap(); - assert!(!peer_net01_transport.is_shm().unwrap()); - - // Send the message - println!("Transport SHM [3a]"); - // The msg count - for (msg_count, _) in (0..MSG_COUNT).enumerate() { - // Create the message to send - let mut sbuf = ztimeout!(async { - loop { - match shm01.alloc(MSG_SIZE) { - Ok(sbuf) => break sbuf, - Err(_) => tokio::time::sleep(USLEEP).await, - } - } - }); - - let bs = unsafe { sbuf.as_mut_slice() }; - bs[0..8].copy_from_slice(&msg_count.to_le_bytes()); - - let message: NetworkMessage = Push { - wire_expr: "test".into(), - ext_qos: QoSType::new(Priority::default(), CongestionControl::Block, false), - ext_tstamp: None, - ext_nodeid: NodeIdType::default(), - payload: Put { - payload: sbuf.into(), - timestamp: None, - encoding: Encoding::default(), - ext_sinfo: None, - ext_shm: None, - ext_attachment: None, - ext_unknown: vec![], + // Create a peer manager with shared-memory authenticator disabled + let peer_net01_handler = Arc::new(SHPeer::new(false)); + let peer_net01_manager = TransportManager::builder() + .whatami(WhatAmI::Peer) + .zid(peer_net01) + .unicast( + TransportManager::config_unicast() + .shm(false) + .lowlatency(lowlatency_transport) + .qos(!lowlatency_transport), + ) + .build(peer_net01_handler.clone()) + .unwrap(); + + // Create the listener on the peer + println!("Transport SHM [1a]"); + let _ = ztimeout!(peer_shm01_manager.add_listener(endpoint.clone())).unwrap(); + + // Create a transport with the peer + println!("Transport SHM [1b]"); + let peer_shm01_transport = + ztimeout!(peer_shm02_manager.open_transport_unicast(endpoint.clone())).unwrap(); + assert!(peer_shm01_transport.is_shm().unwrap()); + + // Create a transport with the peer + println!("Transport SHM [1c]"); + let peer_net02_transport = + ztimeout!(peer_net01_manager.open_transport_unicast(endpoint.clone())).unwrap(); + assert!(!peer_net02_transport.is_shm().unwrap()); + + // Retrieve the transports + println!("Transport SHM [2a]"); + let peer_shm02_transport = peer_shm01_manager + .get_transport_unicast(&peer_shm02) + .await + .unwrap(); + assert!(peer_shm02_transport.is_shm().unwrap()); + + println!("Transport SHM [2b]"); + let peer_net01_transport = peer_shm01_manager + .get_transport_unicast(&peer_net01) + .await + .unwrap(); + assert!(!peer_net01_transport.is_shm().unwrap()); + + // Send the message + println!("Transport SHM [3a]"); + // The msg count + for (msg_count, _) in (0..MSG_COUNT).enumerate() { + // Create the message to send + let mut sbuf = ztimeout!(async { + loop { + match shm01.alloc(MSG_SIZE) { + Ok(sbuf) => break sbuf, + Err(_) => tokio::time::sleep(USLEEP).await, } - .into(), } - .into(); + }); - peer_shm02_transport.schedule(message).unwrap(); + let bs = unsafe { sbuf.as_mut_slice() }; + bs[0..8].copy_from_slice(&msg_count.to_le_bytes()); + + let message: NetworkMessage = Push { + wire_expr: "test".into(), + ext_qos: QoSType::new(Priority::default(), CongestionControl::Block, false), + ext_tstamp: None, + ext_nodeid: NodeIdType::default(), + payload: Put { + payload: sbuf.into(), + timestamp: None, + encoding: Encoding::default(), + ext_sinfo: None, + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], + } + .into(), } + .into(); - // Wait a little bit - tokio::time::sleep(SLEEP).await; + peer_shm02_transport.schedule(message).unwrap(); + } - // Wait for the messages to arrive to the other side - println!("Transport SHM [3b]"); - ztimeout!(async { - while peer_shm02_handler.get_count() != MSG_COUNT { - tokio::time::sleep(SLEEP).await; - } - }); + // Wait a little bit + tokio::time::sleep(SLEEP).await; - // Send the message - println!("Transport SHM [4a]"); - // The msg count - for (msg_count, _) in (0..MSG_COUNT).enumerate() { - // Create the message to send - let mut sbuf = ztimeout!(async { - loop { - match shm01.alloc(MSG_SIZE) { - Ok(sbuf) => break sbuf, - Err(_) => tokio::time::sleep(USLEEP).await, - } - } - }); - let bs = unsafe { sbuf.as_mut_slice() }; - bs[0..8].copy_from_slice(&msg_count.to_le_bytes()); - - let message: NetworkMessage = Push { - wire_expr: "test".into(), - ext_qos: QoSType::new(Priority::default(), CongestionControl::Block, false), - ext_tstamp: None, - ext_nodeid: NodeIdType::default(), - payload: Put { - payload: sbuf.into(), - timestamp: None, - encoding: Encoding::default(), - ext_sinfo: None, - ext_shm: None, - ext_attachment: None, - ext_unknown: vec![], + // Wait for the messages to arrive to the other side + println!("Transport SHM [3b]"); + ztimeout!(async { + while peer_shm02_handler.get_count() != MSG_COUNT { + tokio::time::sleep(SLEEP).await; + } + }); + + // Send the message + println!("Transport SHM [4a]"); + // The msg count + for (msg_count, _) in (0..MSG_COUNT).enumerate() { + // Create the message to send + let mut sbuf = ztimeout!(async { + loop { + match shm01.alloc(MSG_SIZE) { + Ok(sbuf) => break sbuf, + Err(_) => tokio::time::sleep(USLEEP).await, } - .into(), } - .into(); - - peer_net01_transport.schedule(message).unwrap(); + }); + let bs = unsafe { sbuf.as_mut_slice() }; + bs[0..8].copy_from_slice(&msg_count.to_le_bytes()); + + let message: NetworkMessage = Push { + wire_expr: "test".into(), + ext_qos: QoSType::new(Priority::default(), CongestionControl::Block, false), + ext_tstamp: None, + ext_nodeid: NodeIdType::default(), + payload: Put { + payload: sbuf.into(), + timestamp: None, + encoding: Encoding::default(), + ext_sinfo: None, + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], + } + .into(), } + .into(); - // Wait a little bit - tokio::time::sleep(SLEEP).await; + peer_net01_transport.schedule(message).unwrap(); + } - // Wait for the messages to arrive to the other side - println!("Transport SHM [4b]"); - ztimeout!(async { - while peer_net01_handler.get_count() != MSG_COUNT { - tokio::time::sleep(SLEEP).await; - } - }); + // Wait a little bit + tokio::time::sleep(SLEEP).await; - // Wait a little bit - tokio::time::sleep(SLEEP).await; + // Wait for the messages to arrive to the other side + println!("Transport SHM [4b]"); + ztimeout!(async { + while peer_net01_handler.get_count() != MSG_COUNT { + tokio::time::sleep(SLEEP).await; + } + }); - // Close the transports - println!("Transport SHM [5a]"); - ztimeout!(peer_shm02_transport.close()).unwrap(); + // Wait a little bit + tokio::time::sleep(SLEEP).await; - println!("Transport SHM [5b]"); - ztimeout!(peer_net01_transport.close()).unwrap(); + // Close the transports + println!("Transport SHM [5a]"); + ztimeout!(peer_shm02_transport.close()).unwrap(); - ztimeout!(async { - while !peer_shm01_manager.get_transports_unicast().await.is_empty() { - tokio::time::sleep(SLEEP).await; - } - }); + println!("Transport SHM [5b]"); + ztimeout!(peer_net01_transport.close()).unwrap(); - // Delete the listener - println!("Transport SHM [6a]"); - ztimeout!(peer_shm01_manager.del_listener(endpoint)).unwrap(); + ztimeout!(async { + while !peer_shm01_manager.get_transports_unicast().await.is_empty() { + tokio::time::sleep(SLEEP).await; + } + }); - // Wait a little bit - ztimeout!(async { - while !peer_shm01_manager.get_listeners().await.is_empty() { - tokio::time::sleep(SLEEP).await; - } - }); - tokio::time::sleep(SLEEP).await; + // Delete the listener + println!("Transport SHM [6a]"); + ztimeout!(peer_shm01_manager.del_listener(endpoint)).unwrap(); - ztimeout!(peer_net01_manager.close()); - ztimeout!(peer_shm01_manager.close()); - ztimeout!(peer_shm02_manager.close()); + // Wait a little bit + ztimeout!(async { + while !peer_shm01_manager.get_listeners().await.is_empty() { + tokio::time::sleep(SLEEP).await; + } + }); + tokio::time::sleep(SLEEP).await; - // Wait a little bit - tokio::time::sleep(SLEEP).await; - } + ztimeout!(peer_net01_manager.close()); + ztimeout!(peer_shm01_manager.close()); + ztimeout!(peer_shm02_manager.close()); - #[cfg(feature = "transport_tcp")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn transport_tcp_shm() { - zenoh_util::try_init_log_from_env(); - let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 14000).parse().unwrap(); - run(&endpoint, false).await; - } + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} - #[cfg(feature = "transport_tcp")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn transport_tcp_shm_with_lowlatency_transport() { - zenoh_util::try_init_log_from_env(); - let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 14001).parse().unwrap(); - run(&endpoint, true).await; - } +#[cfg(feature = "transport_tcp")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_tcp_shm() { + zenoh_util::try_init_log_from_env(); + let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 14000).parse().unwrap(); + run(&endpoint, false).await; +} - #[cfg(feature = "transport_ws")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn transport_ws_shm() { - zenoh_util::try_init_log_from_env(); - let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 14010).parse().unwrap(); - run(&endpoint, false).await; - } +#[cfg(feature = "transport_tcp")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_tcp_shm_with_lowlatency_transport() { + zenoh_util::try_init_log_from_env(); + let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 14001).parse().unwrap(); + run(&endpoint, true).await; +} - #[cfg(feature = "transport_ws")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn transport_ws_shm_with_lowlatency_transport() { - zenoh_util::try_init_log_from_env(); - let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 14011).parse().unwrap(); - run(&endpoint, true).await; - } +#[cfg(feature = "transport_ws")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_ws_shm() { + zenoh_util::try_init_log_from_env(); + let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 14010).parse().unwrap(); + run(&endpoint, false).await; +} - #[cfg(feature = "transport_unixpipe")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn transport_unixpipe_shm() { - zenoh_util::try_init_log_from_env(); - let endpoint: EndPoint = "unixpipe/transport_unixpipe_shm".parse().unwrap(); - run(&endpoint, false).await; - } +#[cfg(feature = "transport_ws")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_ws_shm_with_lowlatency_transport() { + zenoh_util::try_init_log_from_env(); + let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 14011).parse().unwrap(); + run(&endpoint, true).await; +} - #[cfg(feature = "transport_unixpipe")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn transport_unixpipe_shm_with_lowlatency_transport() { - zenoh_util::try_init_log_from_env(); - let endpoint: EndPoint = "unixpipe/transport_unixpipe_shm_with_lowlatency_transport" - .parse() - .unwrap(); - run(&endpoint, true).await; - } +#[cfg(feature = "transport_unixpipe")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_unixpipe_shm() { + zenoh_util::try_init_log_from_env(); + let endpoint: EndPoint = "unixpipe/transport_unixpipe_shm".parse().unwrap(); + run(&endpoint, false).await; +} + +#[cfg(feature = "transport_unixpipe")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_unixpipe_shm_with_lowlatency_transport() { + zenoh_util::try_init_log_from_env(); + let endpoint: EndPoint = "unixpipe/transport_unixpipe_shm_with_lowlatency_transport" + .parse() + .unwrap(); + run(&endpoint, true).await; } diff --git a/io/zenoh-transport/tests/unicast_simultaneous.rs b/io/zenoh-transport/tests/unicast_simultaneous.rs index 5cd8224fc2..10e27c2b78 100644 --- a/io/zenoh-transport/tests/unicast_simultaneous.rs +++ b/io/zenoh-transport/tests/unicast_simultaneous.rs @@ -11,351 +11,350 @@ // Contributors: // ZettaScale Zenoh Team, // -#[cfg(target_family = "unix")] -mod tests { - use std::any::Any; - use std::convert::TryFrom; - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::Arc; - use std::time::Duration; - use zenoh_core::ztimeout; - use zenoh_link::Link; - use zenoh_protocol::{ - core::{CongestionControl, Encoding, EndPoint, Priority, WhatAmI, ZenohId}, - network::{ - push::ext::{NodeIdType, QoSType}, - NetworkMessage, Push, - }, - zenoh::Put, - }; - use zenoh_result::ZResult; - use zenoh_transport::{ - multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, - TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, - }; - - const TIMEOUT: Duration = Duration::from_secs(60); - const SLEEP: Duration = Duration::from_millis(500); - - const MSG_COUNT: usize = 16; - const MSG_SIZE: usize = 1_024; - - // Transport Handler for the router - struct SHPeer { - zid: ZenohId, - count: Arc, - } - - impl SHPeer { - fn new(zid: ZenohId) -> Self { - Self { - zid, - count: Arc::new(AtomicUsize::new(0)), - } - } +#![cfg(target_family = "unix")] + +use std::any::Any; +use std::convert::TryFrom; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use zenoh_core::ztimeout; +use zenoh_link::Link; +use zenoh_protocol::{ + core::{CongestionControl, Encoding, EndPoint, Priority, WhatAmI, ZenohId}, + network::{ + push::ext::{NodeIdType, QoSType}, + NetworkMessage, Push, + }, + zenoh::Put, +}; +use zenoh_result::ZResult; +use zenoh_transport::{ + multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, + TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, +}; + +const TIMEOUT: Duration = Duration::from_secs(60); +const SLEEP: Duration = Duration::from_millis(500); + +const MSG_COUNT: usize = 16; +const MSG_SIZE: usize = 1_024; + +// Transport Handler for the router +struct SHPeer { + zid: ZenohId, + count: Arc, +} - fn get_count(&self) -> usize { - self.count.load(Ordering::SeqCst) +impl SHPeer { + fn new(zid: ZenohId) -> Self { + Self { + zid, + count: Arc::new(AtomicUsize::new(0)), } } - impl TransportEventHandler for SHPeer { - fn new_unicast( - &self, - _peer: TransportPeer, - transport: TransportUnicast, - ) -> ZResult> { - // Create the message to send - let message: NetworkMessage = Push { - wire_expr: "test".into(), - ext_qos: QoSType::new(Priority::Control, CongestionControl::Block, false), - ext_tstamp: None, - ext_nodeid: NodeIdType::default(), - payload: Put { - payload: vec![0u8; MSG_SIZE].into(), - timestamp: None, - encoding: Encoding::default(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_attachment: None, - ext_unknown: vec![], - } - .into(), - } - .into(); + fn get_count(&self) -> usize { + self.count.load(Ordering::SeqCst) + } +} - println!("[Simultaneous {}] Sending {}...", self.zid, MSG_COUNT); - for _ in 0..MSG_COUNT { - transport.schedule(message.clone()).unwrap(); +impl TransportEventHandler for SHPeer { + fn new_unicast( + &self, + _peer: TransportPeer, + transport: TransportUnicast, + ) -> ZResult> { + // Create the message to send + let message: NetworkMessage = Push { + wire_expr: "test".into(), + ext_qos: QoSType::new(Priority::Control, CongestionControl::Block, false), + ext_tstamp: None, + ext_nodeid: NodeIdType::default(), + payload: Put { + payload: vec![0u8; MSG_SIZE].into(), + timestamp: None, + encoding: Encoding::default(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], } - println!("[Simultaneous {}] ... sent {}", self.zid, MSG_COUNT); - - let mh = Arc::new(MHPeer::new(self.count.clone())); - Ok(mh) + .into(), } + .into(); - fn new_multicast( - &self, - _transport: TransportMulticast, - ) -> ZResult> { - panic!(); + println!("[Simultaneous {}] Sending {}...", self.zid, MSG_COUNT); + for _ in 0..MSG_COUNT { + transport.schedule(message.clone()).unwrap(); } + println!("[Simultaneous {}] ... sent {}", self.zid, MSG_COUNT); + + let mh = Arc::new(MHPeer::new(self.count.clone())); + Ok(mh) } - struct MHPeer { - count: Arc, + fn new_multicast( + &self, + _transport: TransportMulticast, + ) -> ZResult> { + panic!(); } +} - impl MHPeer { - fn new(count: Arc) -> Self { - Self { count } - } +struct MHPeer { + count: Arc, +} + +impl MHPeer { + fn new(count: Arc) -> Self { + Self { count } } +} - impl TransportPeerEventHandler for MHPeer { - fn handle_message(&self, _msg: NetworkMessage) -> ZResult<()> { - self.count.fetch_add(1, Ordering::AcqRel); - Ok(()) - } +impl TransportPeerEventHandler for MHPeer { + fn handle_message(&self, _msg: NetworkMessage) -> ZResult<()> { + self.count.fetch_add(1, Ordering::AcqRel); + Ok(()) + } - fn new_link(&self, _link: Link) {} - fn del_link(&self, _link: Link) {} - fn closing(&self) {} - fn closed(&self) {} + fn new_link(&self, _link: Link) {} + fn del_link(&self, _link: Link) {} + fn closing(&self) {} + fn closed(&self) {} - fn as_any(&self) -> &dyn Any { - self - } + fn as_any(&self) -> &dyn Any { + self } +} - async fn transport_simultaneous(endpoint01: Vec, endpoint02: Vec) { - /* [Peers] */ - let peer_id01 = ZenohId::try_from([2]).unwrap(); - let peer_id02 = ZenohId::try_from([3]).unwrap(); - - // Create the peer01 transport manager - let peer_sh01 = Arc::new(SHPeer::new(peer_id01)); - let unicast = TransportManager::config_unicast().max_links(endpoint01.len()); - let peer01_manager = TransportManager::builder() - .whatami(WhatAmI::Peer) - .zid(peer_id01) - .unicast(unicast) - .build(peer_sh01.clone()) - .unwrap(); - - // Create the peer02 transport manager - let peer_sh02 = Arc::new(SHPeer::new(peer_id02)); - let unicast = TransportManager::config_unicast().max_links(endpoint02.len()); - let peer02_manager = TransportManager::builder() - .whatami(WhatAmI::Peer) - .zid(peer_id02) - .unicast(unicast) - .build(peer_sh02.clone()) - .unwrap(); - - // Add the endpoints on the peer01 - for e in endpoint01.iter() { - let res = ztimeout!(peer01_manager.add_listener(e.clone())); - println!("[Simultaneous 01a] => Adding endpoint {e:?}: {res:?}"); - assert!(res.is_ok()); +async fn transport_simultaneous(endpoint01: Vec, endpoint02: Vec) { + /* [Peers] */ + let peer_id01 = ZenohId::try_from([2]).unwrap(); + let peer_id02 = ZenohId::try_from([3]).unwrap(); + + // Create the peer01 transport manager + let peer_sh01 = Arc::new(SHPeer::new(peer_id01)); + let unicast = TransportManager::config_unicast().max_links(endpoint01.len()); + let peer01_manager = TransportManager::builder() + .whatami(WhatAmI::Peer) + .zid(peer_id01) + .unicast(unicast) + .build(peer_sh01.clone()) + .unwrap(); + + // Create the peer02 transport manager + let peer_sh02 = Arc::new(SHPeer::new(peer_id02)); + let unicast = TransportManager::config_unicast().max_links(endpoint02.len()); + let peer02_manager = TransportManager::builder() + .whatami(WhatAmI::Peer) + .zid(peer_id02) + .unicast(unicast) + .build(peer_sh02.clone()) + .unwrap(); + + // Add the endpoints on the peer01 + for e in endpoint01.iter() { + let res = ztimeout!(peer01_manager.add_listener(e.clone())); + println!("[Simultaneous 01a] => Adding endpoint {e:?}: {res:?}"); + assert!(res.is_ok()); + } + let locs = peer01_manager.get_listeners().await; + println!("[Simultaneous 01b] => Getting endpoints: {endpoint01:?} {locs:?}"); + assert_eq!(endpoint01.len(), locs.len()); + + // Add the endpoints on peer02 + for e in endpoint02.iter() { + let res = ztimeout!(peer02_manager.add_listener(e.clone())); + println!("[Simultaneous 02a] => Adding endpoint {e:?}: {res:?}"); + assert!(res.is_ok()); + } + let locs = peer02_manager.get_listeners().await; + println!("[Simultaneous 02b] => Getting endpoints: {endpoint02:?} {locs:?}"); + assert_eq!(endpoint02.len(), locs.len()); + + // Endpoints + let c_ep01 = endpoint01.clone(); + let c_ep02 = endpoint02.clone(); + + // Peer01 + let c_p01m = peer01_manager.clone(); + let peer01_task = tokio::task::spawn(async move { + // Open the transport with the second peer + // These open should succeed + for e in c_ep02.iter() { + println!("[Simultaneous 01c] => Opening transport with {e:?}..."); + let _ = ztimeout!(c_p01m.open_transport_unicast(e.clone())).unwrap(); } - let locs = peer01_manager.get_listeners().await; - println!("[Simultaneous 01b] => Getting endpoints: {endpoint01:?} {locs:?}"); - assert_eq!(endpoint01.len(), locs.len()); - - // Add the endpoints on peer02 - for e in endpoint02.iter() { - let res = ztimeout!(peer02_manager.add_listener(e.clone())); - println!("[Simultaneous 02a] => Adding endpoint {e:?}: {res:?}"); - assert!(res.is_ok()); + + // These open should fails + for e in c_ep02.iter() { + println!("[Simultaneous 01d] => Exceeding transport with {e:?}..."); + let res = ztimeout!(c_p01m.open_transport_unicast(e.clone())); + assert!(res.is_err()); } - let locs = peer02_manager.get_listeners().await; - println!("[Simultaneous 02b] => Getting endpoints: {endpoint02:?} {locs:?}"); - assert_eq!(endpoint02.len(), locs.len()); - - // Endpoints - let c_ep01 = endpoint01.clone(); - let c_ep02 = endpoint02.clone(); - - // Peer01 - let c_p01m = peer01_manager.clone(); - let peer01_task = tokio::task::spawn(async move { - // Open the transport with the second peer - // These open should succeed - for e in c_ep02.iter() { - println!("[Simultaneous 01c] => Opening transport with {e:?}..."); - let _ = ztimeout!(c_p01m.open_transport_unicast(e.clone())).unwrap(); - } - // These open should fails - for e in c_ep02.iter() { - println!("[Simultaneous 01d] => Exceeding transport with {e:?}..."); - let res = ztimeout!(c_p01m.open_transport_unicast(e.clone())); - assert!(res.is_err()); + tokio::time::sleep(SLEEP).await; + + let tp02 = ztimeout!(async { + let mut tp02 = None; + while tp02.is_none() { + tokio::time::sleep(SLEEP).await; + println!( + "[Simultaneous 01e] => Transports: {:?}", + peer01_manager.get_transports_unicast().await + ); + tp02 = peer01_manager.get_transport_unicast(&peer_id02).await; } - tokio::time::sleep(SLEEP).await; - - let tp02 = ztimeout!(async { - let mut tp02 = None; - while tp02.is_none() { - tokio::time::sleep(SLEEP).await; - println!( - "[Simultaneous 01e] => Transports: {:?}", - peer01_manager.get_transports_unicast().await - ); - tp02 = peer01_manager.get_transport_unicast(&peer_id02).await; - } - - tp02.unwrap() - }); - - // Wait for the links to be properly established - ztimeout!(async { - let expected = endpoint01.len() + c_ep02.len(); - let mut tl02 = vec![]; - while tl02.len() != expected { - tokio::time::sleep(SLEEP).await; - tl02 = tp02.get_links().unwrap(); - println!("[Simultaneous 01f] => Links {}/{}", tl02.len(), expected); - } - }); - - // Wait for the messages to arrive to peer 01 - ztimeout!(async { - let mut check = 0; - while check != MSG_COUNT { - tokio::time::sleep(SLEEP).await; - check = peer_sh01.get_count(); - println!("[Simultaneous 01g] => Received {check:?}/{MSG_COUNT:?}"); - } - }); + tp02.unwrap() }); - // Peer02 - let c_p02m = peer02_manager.clone(); - let peer02_task = tokio::task::spawn(async move { - // Open the transport with the first peer - // These open should succeed - for e in c_ep01.iter() { - println!("[Simultaneous 02c] => Opening transport with {e:?}..."); - let _ = ztimeout!(c_p02m.open_transport_unicast(e.clone())).unwrap(); + // Wait for the links to be properly established + ztimeout!(async { + let expected = endpoint01.len() + c_ep02.len(); + let mut tl02 = vec![]; + while tl02.len() != expected { + tokio::time::sleep(SLEEP).await; + tl02 = tp02.get_links().unwrap(); + println!("[Simultaneous 01f] => Links {}/{}", tl02.len(), expected); } + }); - // These open should fails - for e in c_ep01.iter() { - println!("[Simultaneous 02d] => Exceeding transport with {e:?}..."); - let res = ztimeout!(c_p02m.open_transport_unicast(e.clone())); - assert!(res.is_err()); + // Wait for the messages to arrive to peer 01 + ztimeout!(async { + let mut check = 0; + while check != MSG_COUNT { + tokio::time::sleep(SLEEP).await; + check = peer_sh01.get_count(); + println!("[Simultaneous 01g] => Received {check:?}/{MSG_COUNT:?}"); } - - // Wait a little bit - tokio::time::sleep(SLEEP).await; - - let tp01 = ztimeout!(async { - let mut tp01 = None; - while tp01.is_none() { - tokio::time::sleep(SLEEP).await; - println!( - "[Simultaneous 02e] => Transports: {:?}", - peer02_manager.get_transports_unicast().await - ); - tp01 = peer02_manager.get_transport_unicast(&peer_id01).await; - } - tp01.unwrap() - }); - - // Wait for the links to be properly established - ztimeout!(async { - let expected = c_ep01.len() + endpoint02.len(); - let mut tl01 = vec![]; - while tl01.len() != expected { - tokio::time::sleep(SLEEP).await; - tl01 = tp01.get_links().unwrap(); - println!("[Simultaneous 02f] => Links {}/{}", tl01.len(), expected); - } - }); - - // Wait for the messages to arrive to peer 02 - ztimeout!(async { - let mut check = 0; - while check != MSG_COUNT { - tokio::time::sleep(SLEEP).await; - check = peer_sh02.get_count(); - println!("[Simultaneous 02g] => Received {check:?}/{MSG_COUNT:?}"); - } - }); }); + }); + + // Peer02 + let c_p02m = peer02_manager.clone(); + let peer02_task = tokio::task::spawn(async move { + // Open the transport with the first peer + // These open should succeed + for e in c_ep01.iter() { + println!("[Simultaneous 02c] => Opening transport with {e:?}..."); + let _ = ztimeout!(c_p02m.open_transport_unicast(e.clone())).unwrap(); + } - println!("[Simultaneous] => Waiting for peer01 and peer02 tasks..."); - let _ = tokio::join!(peer01_task, peer02_task); - println!("[Simultaneous] => Waiting for peer01 and peer02 tasks... DONE\n"); + // These open should fails + for e in c_ep01.iter() { + println!("[Simultaneous 02d] => Exceeding transport with {e:?}..."); + let res = ztimeout!(c_p02m.open_transport_unicast(e.clone())); + assert!(res.is_err()); + } // Wait a little bit tokio::time::sleep(SLEEP).await; - } - #[cfg(feature = "transport_tcp")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn transport_tcp_simultaneous() { - zenoh_util::try_init_log_from_env(); - let endpoint01: Vec = vec![ - format!("tcp/127.0.0.1:{}", 15000).parse().unwrap(), - format!("tcp/127.0.0.1:{}", 15001).parse().unwrap(), - format!("tcp/127.0.0.1:{}", 15002).parse().unwrap(), - format!("tcp/127.0.0.1:{}", 15003).parse().unwrap(), - ]; - let endpoint02: Vec = vec![ - format!("tcp/127.0.0.1:{}", 15010).parse().unwrap(), - format!("tcp/127.0.0.1:{}", 15011).parse().unwrap(), - format!("tcp/127.0.0.1:{}", 15012).parse().unwrap(), - format!("tcp/127.0.0.1:{}", 15013).parse().unwrap(), - ]; - - transport_simultaneous(endpoint01, endpoint02).await; - } + let tp01 = ztimeout!(async { + let mut tp01 = None; + while tp01.is_none() { + tokio::time::sleep(SLEEP).await; + println!( + "[Simultaneous 02e] => Transports: {:?}", + peer02_manager.get_transports_unicast().await + ); + tp01 = peer02_manager.get_transport_unicast(&peer_id01).await; + } + tp01.unwrap() + }); - #[cfg(feature = "transport_unixpipe")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - #[ignore] - async fn transport_unixpipe_simultaneous() { - zenoh_util::try_init_log_from_env(); - let endpoint01: Vec = vec![ - "unixpipe/transport_unixpipe_simultaneous".parse().unwrap(), - "unixpipe/transport_unixpipe_simultaneous2".parse().unwrap(), - "unixpipe/transport_unixpipe_simultaneous3".parse().unwrap(), - "unixpipe/transport_unixpipe_simultaneous4".parse().unwrap(), - ]; - let endpoint02: Vec = vec![ - "unixpipe/transport_unixpipe_simultaneous5".parse().unwrap(), - "unixpipe/transport_unixpipe_simultaneous6".parse().unwrap(), - "unixpipe/transport_unixpipe_simultaneous7".parse().unwrap(), - "unixpipe/transport_unixpipe_simultaneous8".parse().unwrap(), - ]; - - transport_simultaneous(endpoint01, endpoint02).await; - } + // Wait for the links to be properly established + ztimeout!(async { + let expected = c_ep01.len() + endpoint02.len(); + let mut tl01 = vec![]; + while tl01.len() != expected { + tokio::time::sleep(SLEEP).await; + tl01 = tp01.get_links().unwrap(); + println!("[Simultaneous 02f] => Links {}/{}", tl01.len(), expected); + } + }); - #[cfg(feature = "transport_ws")] - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - #[ignore] - async fn transport_ws_simultaneous() { - zenoh_util::try_init_log_from_env(); - - let endpoint01: Vec = vec![ - format!("ws/127.0.0.1:{}", 15020).parse().unwrap(), - format!("ws/127.0.0.1:{}", 15021).parse().unwrap(), - format!("ws/127.0.0.1:{}", 15022).parse().unwrap(), - format!("ws/127.0.0.1:{}", 15023).parse().unwrap(), - ]; - let endpoint02: Vec = vec![ - format!("ws/127.0.0.1:{}", 15030).parse().unwrap(), - format!("ws/127.0.0.1:{}", 15031).parse().unwrap(), - format!("ws/127.0.0.1:{}", 15032).parse().unwrap(), - format!("ws/127.0.0.1:{}", 15033).parse().unwrap(), - ]; - - transport_simultaneous(endpoint01, endpoint02).await; - } + // Wait for the messages to arrive to peer 02 + ztimeout!(async { + let mut check = 0; + while check != MSG_COUNT { + tokio::time::sleep(SLEEP).await; + check = peer_sh02.get_count(); + println!("[Simultaneous 02g] => Received {check:?}/{MSG_COUNT:?}"); + } + }); + }); + + println!("[Simultaneous] => Waiting for peer01 and peer02 tasks..."); + let _ = tokio::join!(peer01_task, peer02_task); + println!("[Simultaneous] => Waiting for peer01 and peer02 tasks... DONE\n"); + + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} + +#[cfg(feature = "transport_tcp")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_tcp_simultaneous() { + zenoh_util::try_init_log_from_env(); + let endpoint01: Vec = vec![ + format!("tcp/127.0.0.1:{}", 15000).parse().unwrap(), + format!("tcp/127.0.0.1:{}", 15001).parse().unwrap(), + format!("tcp/127.0.0.1:{}", 15002).parse().unwrap(), + format!("tcp/127.0.0.1:{}", 15003).parse().unwrap(), + ]; + let endpoint02: Vec = vec![ + format!("tcp/127.0.0.1:{}", 15010).parse().unwrap(), + format!("tcp/127.0.0.1:{}", 15011).parse().unwrap(), + format!("tcp/127.0.0.1:{}", 15012).parse().unwrap(), + format!("tcp/127.0.0.1:{}", 15013).parse().unwrap(), + ]; + + transport_simultaneous(endpoint01, endpoint02).await; +} + +#[cfg(feature = "transport_unixpipe")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[ignore] +async fn transport_unixpipe_simultaneous() { + zenoh_util::try_init_log_from_env(); + let endpoint01: Vec = vec![ + "unixpipe/transport_unixpipe_simultaneous".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous2".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous3".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous4".parse().unwrap(), + ]; + let endpoint02: Vec = vec![ + "unixpipe/transport_unixpipe_simultaneous5".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous6".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous7".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous8".parse().unwrap(), + ]; + + transport_simultaneous(endpoint01, endpoint02).await; +} + +#[cfg(feature = "transport_ws")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[ignore] +async fn transport_ws_simultaneous() { + zenoh_util::try_init_log_from_env(); + + let endpoint01: Vec = vec![ + format!("ws/127.0.0.1:{}", 15020).parse().unwrap(), + format!("ws/127.0.0.1:{}", 15021).parse().unwrap(), + format!("ws/127.0.0.1:{}", 15022).parse().unwrap(), + format!("ws/127.0.0.1:{}", 15023).parse().unwrap(), + ]; + let endpoint02: Vec = vec![ + format!("ws/127.0.0.1:{}", 15030).parse().unwrap(), + format!("ws/127.0.0.1:{}", 15031).parse().unwrap(), + format!("ws/127.0.0.1:{}", 15032).parse().unwrap(), + format!("ws/127.0.0.1:{}", 15033).parse().unwrap(), + ]; + + transport_simultaneous(endpoint01, endpoint02).await; } diff --git a/io/zenoh-transport/tests/unicast_transport.rs b/io/zenoh-transport/tests/unicast_transport.rs index 2b9ea902f9..2176443c51 100644 --- a/io/zenoh-transport/tests/unicast_transport.rs +++ b/io/zenoh-transport/tests/unicast_transport.rs @@ -11,6 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // +#![cfg(feature = "test")] + use std::fmt::Write as _; use std::{ any::Any, @@ -21,7 +23,7 @@ use std::{ }, time::Duration, }; -use zenoh_core::{zcondfeat, ztimeout}; +use zenoh_core::ztimeout; use zenoh_link::Link; use zenoh_protocol::{ core::{ @@ -34,10 +36,11 @@ use zenoh_protocol::{ zenoh::Put, }; use zenoh_result::ZResult; -use zenoh_transport::unicast::TransportManagerBuilderUnicast; use zenoh_transport::{ - multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, - TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, + multicast::TransportMulticast, + unicast::{test_helpers::make_transport_manager_builder, TransportUnicast}, + TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, + TransportPeerEventHandler, }; // These keys and certificates below are purposely generated to run TLS and mTLS tests. @@ -237,49 +240,6 @@ const MSG_SIZE_LOWLATENCY: [usize; 2] = [1_024, 65000]; ))] const MSG_SIZE_NOFRAG: [usize; 1] = [1_024]; -fn make_transport_manager_builder( - #[cfg(feature = "transport_multilink")] max_links: usize, - #[cfg(feature = "shared-memory")] with_shm: bool, - lowlatency_transport: bool, -) -> TransportManagerBuilderUnicast { - let transport = make_basic_transport_manager_builder( - #[cfg(feature = "shared-memory")] - with_shm, - lowlatency_transport, - ); - - zcondfeat!( - "transport_multilink", - { - println!("...with max links: {}...", max_links); - transport.max_links(max_links) - }, - transport - ) -} - -fn make_basic_transport_manager_builder( - #[cfg(feature = "shared-memory")] with_shm: bool, - lowlatency_transport: bool, -) -> TransportManagerBuilderUnicast { - println!("Create transport manager builder..."); - let config = zcondfeat!( - "shared-memory", - { - println!("...with SHM..."); - TransportManager::config_unicast().shm(with_shm) - }, - TransportManager::config_unicast() - ); - if lowlatency_transport { - println!("...with LowLatency transport..."); - } - match lowlatency_transport { - true => config.lowlatency(true).qos(false), - false => config, - } -} - // Transport Handler for the router struct SHRouter { count: Arc,