From 0ba6a65216ba8038781d4a1f457c8354b1af0551 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Thu, 8 Aug 2024 17:43:05 +0200 Subject: [PATCH 1/3] Replace trees computation tasks with a worker --- .../src/net/routing/hat/linkstate_peer/mod.rs | 60 ++++++------ zenoh/src/net/routing/hat/router/mod.rs | 93 ++++++++++--------- 2 files changed, 83 insertions(+), 70 deletions(-) diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index b1eeca261f..ac36759757 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -112,45 +112,51 @@ struct HatTables { peer_subs: HashSet>, peer_qabls: HashSet>, peers_net: Option, - peers_trees_task: Option, + peers_trees_task: (TerminatableTask, flume::Sender>), } impl HatTables { fn new() -> Self { + fn spawn_trees_worker() -> (TerminatableTask, flume::Sender>) { + let (sender, receiver) = flume::bounded::>(1); + let task = TerminatableTask::spawn( + zenoh_runtime::ZRuntime::Net, + async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis( + *TREES_COMPUTATION_DELAY_MS, + )) + .await; + if let Ok(tables_ref) = receiver.recv_async().await { + let mut tables = zwrite!(tables_ref.tables); + + tracing::trace!("Compute trees"); + let new_children = + hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(); + + tracing::trace!("Compute routes"); + pubsub::pubsub_tree_change(&mut tables, &new_children); + queries::queries_tree_change(&mut tables, &new_children); + drop(tables); + } + } + }, + TerminatableTask::create_cancellation_token(), + ); + (task, sender) + } + Self { peer_subs: HashSet::new(), peer_qabls: HashSet::new(), peers_net: None, - peers_trees_task: None, + peers_trees_task: spawn_trees_worker(), } } fn schedule_compute_trees(&mut self, tables_ref: Arc) { - tracing::trace!("Schedule computations"); - if self.peers_trees_task.is_none() { - let task = TerminatableTask::spawn( - zenoh_runtime::ZRuntime::Net, - async move { - tokio::time::sleep(std::time::Duration::from_millis( - *TREES_COMPUTATION_DELAY_MS, - )) - .await; - let mut tables = zwrite!(tables_ref.tables); - - tracing::trace!("Compute trees"); - let new_children = hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(); - - tracing::trace!("Compute routes"); - pubsub::pubsub_tree_change(&mut tables, &new_children); - queries::queries_tree_change(&mut tables, &new_children); - - tracing::trace!("Computations completed"); - hat_mut!(tables).peers_trees_task = None; - }, - TerminatableTask::create_cancellation_token(), - ); - self.peers_trees_task = Some(task); - } + tracing::trace!("Schedule trees computation"); + let _ = self.peers_trees_task.1.try_send(tables_ref); } } diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index b4b88d66e9..c9f9399dd8 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -121,13 +121,50 @@ struct HatTables { routers_net: Option, peers_net: Option, shared_nodes: Vec, - routers_trees_task: Option, - peers_trees_task: Option, + routers_trees_task: (TerminatableTask, flume::Sender>), + peers_trees_task: (TerminatableTask, flume::Sender>), router_peers_failover_brokering: bool, } impl HatTables { fn new(router_peers_failover_brokering: bool) -> Self { + fn spawn_trees_worker( + net_type: WhatAmI, + ) -> (TerminatableTask, flume::Sender>) { + let (sender, receiver) = flume::bounded::>(1); + let task = TerminatableTask::spawn( + zenoh_runtime::ZRuntime::Net, + async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis( + *TREES_COMPUTATION_DELAY_MS, + )) + .await; + if let Ok(tables_ref) = receiver.recv_async().await { + let mut tables = zwrite!(tables_ref.tables); + + tracing::trace!("Compute trees"); + let new_children = match net_type { + WhatAmI::Router => hat_mut!(tables) + .routers_net + .as_mut() + .unwrap() + .compute_trees(), + _ => hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(), + }; + + tracing::trace!("Compute routes"); + pubsub::pubsub_tree_change(&mut tables, &new_children, net_type); + queries::queries_tree_change(&mut tables, &new_children, net_type); + drop(tables); + } + } + }, + TerminatableTask::create_cancellation_token(), + ); + (task, sender) + } + Self { router_subs: HashSet::new(), peer_subs: HashSet::new(), @@ -136,8 +173,8 @@ impl HatTables { routers_net: None, peers_net: None, shared_nodes: vec![], - routers_trees_task: None, - peers_trees_task: None, + routers_trees_task: spawn_trees_worker(WhatAmI::Router), + peers_trees_task: spawn_trees_worker(WhatAmI::Peer), router_peers_failover_brokering, } } @@ -240,45 +277,15 @@ impl HatTables { } fn schedule_compute_trees(&mut self, tables_ref: Arc, net_type: WhatAmI) { - tracing::trace!("Schedule computations"); - if (net_type == WhatAmI::Router && self.routers_trees_task.is_none()) - || (net_type == WhatAmI::Peer && self.peers_trees_task.is_none()) - { - let task = TerminatableTask::spawn( - zenoh_runtime::ZRuntime::Net, - async move { - tokio::time::sleep(std::time::Duration::from_millis( - *TREES_COMPUTATION_DELAY_MS, - )) - .await; - let mut tables = zwrite!(tables_ref.tables); - - tracing::trace!("Compute trees"); - let new_children = match net_type { - WhatAmI::Router => hat_mut!(tables) - .routers_net - .as_mut() - .unwrap() - .compute_trees(), - _ => hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(), - }; - - tracing::trace!("Compute routes"); - pubsub::pubsub_tree_change(&mut tables, &new_children, net_type); - queries::queries_tree_change(&mut tables, &new_children, net_type); - - tracing::trace!("Computations completed"); - match net_type { - WhatAmI::Router => hat_mut!(tables).routers_trees_task = None, - _ => hat_mut!(tables).peers_trees_task = None, - }; - }, - TerminatableTask::create_cancellation_token(), - ); - match net_type { - WhatAmI::Router => self.routers_trees_task = Some(task), - _ => self.peers_trees_task = Some(task), - }; + tracing::trace!("Schedule trees computation"); + match net_type { + WhatAmI::Router => { + let _ = self.routers_trees_task.1.try_send(tables_ref); + } + WhatAmI::Peer => { + let _ = self.peers_trees_task.1.try_send(tables_ref); + } + _ => (), } } } From 4b9819bcae48cf7c16949c04614fdc4ca9bb4ef1 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Aug 2024 11:40:48 +0200 Subject: [PATCH 2/3] Address review comments --- .../src/net/routing/hat/linkstate_peer/mod.rs | 67 +++++++------- zenoh/src/net/routing/hat/router/mod.rs | 88 ++++++++++--------- 2 files changed, 80 insertions(+), 75 deletions(-) diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index ac36759757..12936e6166 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -108,55 +108,58 @@ macro_rules! face_hat_mut { } use face_hat_mut; +struct TreesComputationWorker { + _task: TerminatableTask, + tx: flume::Sender>, +} + +impl TreesComputationWorker { + fn new() -> Self { + let (tx, rx) = flume::bounded::>(1); + // NOTE: it seems that we don't care about the cancellation token, so we can use `spawn_abortable` here + let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis( + *TREES_COMPUTATION_DELAY_MS, + )) + .await; + if let Ok(tables_ref) = rx.recv_async().await { + let mut tables = zwrite!(tables_ref.tables); + + tracing::trace!("Compute trees"); + let new_children = hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(); + + tracing::trace!("Compute routes"); + pubsub::pubsub_tree_change(&mut tables, &new_children); + queries::queries_tree_change(&mut tables, &new_children); + drop(tables); + } + } + }); + Self { _task: task, tx } + } +} + struct HatTables { peer_subs: HashSet>, peer_qabls: HashSet>, peers_net: Option, - peers_trees_task: (TerminatableTask, flume::Sender>), + peers_trees_worker: TreesComputationWorker, } impl HatTables { fn new() -> Self { - fn spawn_trees_worker() -> (TerminatableTask, flume::Sender>) { - let (sender, receiver) = flume::bounded::>(1); - let task = TerminatableTask::spawn( - zenoh_runtime::ZRuntime::Net, - async move { - loop { - tokio::time::sleep(std::time::Duration::from_millis( - *TREES_COMPUTATION_DELAY_MS, - )) - .await; - if let Ok(tables_ref) = receiver.recv_async().await { - let mut tables = zwrite!(tables_ref.tables); - - tracing::trace!("Compute trees"); - let new_children = - hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(); - - tracing::trace!("Compute routes"); - pubsub::pubsub_tree_change(&mut tables, &new_children); - queries::queries_tree_change(&mut tables, &new_children); - drop(tables); - } - } - }, - TerminatableTask::create_cancellation_token(), - ); - (task, sender) - } - Self { peer_subs: HashSet::new(), peer_qabls: HashSet::new(), peers_net: None, - peers_trees_task: spawn_trees_worker(), + peers_trees_worker: TreesComputationWorker::new(), } } fn schedule_compute_trees(&mut self, tables_ref: Arc) { tracing::trace!("Schedule trees computation"); - let _ = self.peers_trees_task.1.try_send(tables_ref); + let _ = self.peers_trees_worker.tx.try_send(tables_ref); } } diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index c9f9399dd8..7b0425886a 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -113,6 +113,45 @@ macro_rules! face_hat_mut { } use face_hat_mut; +struct TreesComputationWorker { + _task: TerminatableTask, + tx: flume::Sender>, +} + +impl TreesComputationWorker { + fn new(net_type: WhatAmI) -> Self { + let (tx, rx) = flume::bounded::>(1); + // NOTE: it seems that we don't care about the cancellation token, so we can use `spawn_abortable` here + let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis( + *TREES_COMPUTATION_DELAY_MS, + )) + .await; + if let Ok(tables_ref) = rx.recv_async().await { + let mut tables = zwrite!(tables_ref.tables); + + tracing::trace!("Compute trees"); + let new_children = match net_type { + WhatAmI::Router => hat_mut!(tables) + .routers_net + .as_mut() + .unwrap() + .compute_trees(), + _ => hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(), + }; + + tracing::trace!("Compute routes"); + pubsub::pubsub_tree_change(&mut tables, &new_children, net_type); + queries::queries_tree_change(&mut tables, &new_children, net_type); + drop(tables); + } + } + }); + Self { _task: task, tx } + } +} + struct HatTables { router_subs: HashSet>, peer_subs: HashSet>, @@ -121,50 +160,13 @@ struct HatTables { routers_net: Option, peers_net: Option, shared_nodes: Vec, - routers_trees_task: (TerminatableTask, flume::Sender>), - peers_trees_task: (TerminatableTask, flume::Sender>), + routers_trees_worker: TreesComputationWorker, + peers_trees_worker: TreesComputationWorker, router_peers_failover_brokering: bool, } impl HatTables { fn new(router_peers_failover_brokering: bool) -> Self { - fn spawn_trees_worker( - net_type: WhatAmI, - ) -> (TerminatableTask, flume::Sender>) { - let (sender, receiver) = flume::bounded::>(1); - let task = TerminatableTask::spawn( - zenoh_runtime::ZRuntime::Net, - async move { - loop { - tokio::time::sleep(std::time::Duration::from_millis( - *TREES_COMPUTATION_DELAY_MS, - )) - .await; - if let Ok(tables_ref) = receiver.recv_async().await { - let mut tables = zwrite!(tables_ref.tables); - - tracing::trace!("Compute trees"); - let new_children = match net_type { - WhatAmI::Router => hat_mut!(tables) - .routers_net - .as_mut() - .unwrap() - .compute_trees(), - _ => hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(), - }; - - tracing::trace!("Compute routes"); - pubsub::pubsub_tree_change(&mut tables, &new_children, net_type); - queries::queries_tree_change(&mut tables, &new_children, net_type); - drop(tables); - } - } - }, - TerminatableTask::create_cancellation_token(), - ); - (task, sender) - } - Self { router_subs: HashSet::new(), peer_subs: HashSet::new(), @@ -173,8 +175,8 @@ impl HatTables { routers_net: None, peers_net: None, shared_nodes: vec![], - routers_trees_task: spawn_trees_worker(WhatAmI::Router), - peers_trees_task: spawn_trees_worker(WhatAmI::Peer), + routers_trees_worker: TreesComputationWorker::new(WhatAmI::Router), + peers_trees_worker: TreesComputationWorker::new(WhatAmI::Peer), router_peers_failover_brokering, } } @@ -280,10 +282,10 @@ impl HatTables { tracing::trace!("Schedule trees computation"); match net_type { WhatAmI::Router => { - let _ = self.routers_trees_task.1.try_send(tables_ref); + let _ = self.routers_trees_worker.tx.try_send(tables_ref); } WhatAmI::Peer => { - let _ = self.peers_trees_task.1.try_send(tables_ref); + let _ = self.peers_trees_worker.tx.try_send(tables_ref); } _ => (), } From 21067e45be46c679b4b78b0c7777b7a298c9a8b5 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Aug 2024 14:57:04 +0200 Subject: [PATCH 3/3] Remove review comments --- zenoh/src/net/routing/hat/linkstate_peer/mod.rs | 1 - zenoh/src/net/routing/hat/router/mod.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index 12936e6166..41e1b26e72 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -116,7 +116,6 @@ struct TreesComputationWorker { impl TreesComputationWorker { fn new() -> Self { let (tx, rx) = flume::bounded::>(1); - // NOTE: it seems that we don't care about the cancellation token, so we can use `spawn_abortable` here let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move { loop { tokio::time::sleep(std::time::Duration::from_millis( diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 7b0425886a..407562425e 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -121,7 +121,6 @@ struct TreesComputationWorker { impl TreesComputationWorker { fn new(net_type: WhatAmI) -> Self { let (tx, rx) = flume::bounded::>(1); - // NOTE: it seems that we don't care about the cancellation token, so we can use `spawn_abortable` here let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move { loop { tokio::time::sleep(std::time::Duration::from_millis(