From bcda8ecbf83b2a6445ae7a505d436b03a952b842 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Wed, 31 Jul 2024 15:46:35 +0200 Subject: [PATCH] Fix failing Liveliness Subscriber Undeclaration (#1283) * Undeclare subscribers at the end of liveliness tests * Use `kind` in `Seesion::undeclare_subscriber_inner` * Address review comments --- zenoh-ext/tests/liveliness.rs | 48 +++++++++++++++++++++++++++-------- zenoh/src/api/session.rs | 5 +--- zenoh/tests/liveliness.rs | 44 ++++++++++++++++++++++++++------ 3 files changed, 75 insertions(+), 22 deletions(-) diff --git a/zenoh-ext/tests/liveliness.rs b/zenoh-ext/tests/liveliness.rs index 97dc817394..68d4b1b798 100644 --- a/zenoh-ext/tests/liveliness.rs +++ b/zenoh-ext/tests/liveliness.rs @@ -70,7 +70,7 @@ async fn test_liveliness_querying_subscriber_clique() { .unwrap(); tokio::time::sleep(SLEEP).await; - let _token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); + let token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); @@ -81,12 +81,18 @@ async fn test_liveliness_querying_subscriber_clique() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - drop(token1); + token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); + + token2.undeclare().await.unwrap(); + sub.close().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -106,7 +112,7 @@ async fn test_liveliness_querying_subscriber_brokered() { zenoh_util::try_init_log_from_env(); - let _router = { + let router = { let mut c = config::default(); c.listen .endpoints @@ -168,7 +174,7 @@ async fn test_liveliness_querying_subscriber_brokered() { .unwrap(); tokio::time::sleep(SLEEP).await; - let _token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); + let token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); @@ -179,12 +185,20 @@ async fn test_liveliness_querying_subscriber_brokered() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - drop(token1); + token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); + + token2.undeclare().await.unwrap(); + sub.close().await.unwrap(); + + router.close().await.unwrap(); + client1.close().await.unwrap(); + client2.close().await.unwrap(); + client3.close().await.unwrap(); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -244,7 +258,7 @@ async fn test_liveliness_fetching_subscriber_clique() { .unwrap(); tokio::time::sleep(SLEEP).await; - let _token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); + let token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); @@ -255,12 +269,18 @@ async fn test_liveliness_fetching_subscriber_clique() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - drop(token1); + token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); + + token2.undeclare().await.unwrap(); + sub.close().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -280,7 +300,7 @@ async fn test_liveliness_fetching_subscriber_brokered() { zenoh_util::try_init_log_from_env(); - let _router = { + let router = { let mut c = config::default(); c.listen .endpoints @@ -346,7 +366,7 @@ async fn test_liveliness_fetching_subscriber_brokered() { .unwrap(); tokio::time::sleep(SLEEP).await; - let _token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); + let token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); @@ -357,10 +377,18 @@ async fn test_liveliness_fetching_subscriber_brokered() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - drop(token1); + token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); + + token2.undeclare().await.unwrap(); + sub.close().await.unwrap(); + + router.close().await.unwrap(); + client1.close().await.unwrap(); + client2.close().await.unwrap(); + client3.close().await.unwrap(); } diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 4ca924e023..06f44b8bf5 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1238,10 +1238,7 @@ impl Session { pub(crate) fn undeclare_subscriber_inner(&self, sid: Id, kind: SubscriberKind) -> ZResult<()> { let mut state = zwrite!(self.state); - if let Some(sub_state) = state - .subscribers_mut(SubscriberKind::Subscriber) - .remove(&sid) - { + if let Some(sub_state) = state.subscribers_mut(kind).remove(&sid) { trace!("undeclare_subscriber({:?})", sub_state); for res in state .local_resources diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index 72dab9bd29..4d964cc1cf 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -65,12 +65,17 @@ async fn test_liveliness_subscriber_clique() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - drop(token); + token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + sub.undeclare().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -114,7 +119,7 @@ async fn test_liveliness_query_clique() { s }; - let _token = ztimeout!(peer1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + let token = ztimeout!(peer1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; let get = ztimeout!(peer2.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); @@ -123,6 +128,11 @@ async fn test_liveliness_query_clique() { let sample = ztimeout!(get.recv_async()).unwrap().into_result().unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + token.undeclare().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -141,7 +151,7 @@ async fn test_liveliness_subscriber_brokered() { zenoh_util::try_init_log_from_env(); - let _router = { + let router = { let mut c = config::default(); c.listen .endpoints @@ -190,12 +200,18 @@ async fn test_liveliness_subscriber_brokered() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - drop(token); + token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + sub.undeclare().await.unwrap(); + + router.close().await.unwrap(); + client1.close().await.unwrap(); + client2.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -213,7 +229,7 @@ async fn test_liveliness_query_brokered() { zenoh_util::try_init_log_from_env(); - let _router = { + let router = { let mut c = config::default(); c.listen .endpoints @@ -252,7 +268,7 @@ async fn test_liveliness_query_brokered() { s }; - let _token = ztimeout!(client1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + let token = ztimeout!(client1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; let get = ztimeout!(client2.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); @@ -261,6 +277,12 @@ async fn test_liveliness_query_brokered() { let sample = ztimeout!(get.recv_async()).unwrap().into_result().unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + token.undeclare().await.unwrap(); + + router.close().await.unwrap(); + client1.close().await.unwrap(); + client2.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -295,12 +317,15 @@ async fn test_liveliness_subscriber_local() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - drop(token); + token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + sub.undeclare().await.unwrap(); + peer.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -325,7 +350,7 @@ async fn test_liveliness_query_local() { s }; - let _token = ztimeout!(peer.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + let token = ztimeout!(peer.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; let get = ztimeout!(peer.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); @@ -334,4 +359,7 @@ async fn test_liveliness_query_local() { let sample = ztimeout!(get.recv_async()).unwrap().into_result().unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + token.undeclare().await.unwrap(); + peer.close().await.unwrap(); }