Skip to content

Commit

Permalink
Replace trees computation tasks with a worker (eclipse-zenoh#1303)
Browse files Browse the repository at this point in the history
* Replace trees computation tasks with a worker

* Address review comments

* Remove review comments
  • Loading branch information
OlivierHecart authored and gmartin82 committed Aug 19, 2024
1 parent 5580910 commit ca9cce0
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 99 deletions.
80 changes: 40 additions & 40 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::{
any::Any,
collections::{HashMap, HashSet},
sync::{atomic::AtomicU32, Arc},
time::Duration,
};

use token::{token_remove_node, undeclare_simple_token};
Expand Down Expand Up @@ -116,42 +115,21 @@ macro_rules! face_hat_mut {
}
use face_hat_mut;

struct HatTables {
linkstatepeer_subs: HashSet<Arc<Resource>>,
linkstatepeer_tokens: HashSet<Arc<Resource>>,
linkstatepeer_qabls: HashSet<Arc<Resource>>,
linkstatepeers_net: Option<Network>,
linkstatepeers_trees_task: Option<TerminatableTask>,
struct TreesComputationWorker {
_task: TerminatableTask,
tx: flume::Sender<Arc<TablesLock>>,
}

impl Drop for HatTables {
fn drop(&mut self) {
if let Some(mut task) = self.linkstatepeers_trees_task.take() {
task.terminate(Duration::from_secs(10));
}
}
}

impl HatTables {
impl TreesComputationWorker {
fn new() -> Self {
Self {
linkstatepeer_subs: HashSet::new(),
linkstatepeer_tokens: HashSet::new(),
linkstatepeer_qabls: HashSet::new(),
linkstatepeers_net: None,
linkstatepeers_trees_task: None,
}
}

fn schedule_compute_trees(&mut self, tables_ref: Arc<TablesLock>) {
if self.linkstatepeers_trees_task.is_none() {
let task = TerminatableTask::spawn(
zenoh_runtime::ZRuntime::Net,
async move {
tokio::time::sleep(std::time::Duration::from_millis(
*TREES_COMPUTATION_DELAY_MS,
))
.await;
let (tx, rx) = flume::bounded::<Arc<TablesLock>>(1);
let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move {
loop {
tokio::time::sleep(std::time::Duration::from_millis(
*TREES_COMPUTATION_DELAY_MS,
))
.await;
if let Ok(tables_ref) = rx.recv_async().await {
let mut tables = zwrite!(tables_ref.tables);

tracing::trace!("Compute trees");
Expand All @@ -165,15 +143,37 @@ impl HatTables {
pubsub::pubsub_tree_change(&mut tables, &new_children);
queries::queries_tree_change(&mut tables, &new_children);
token::token_tree_change(&mut tables, &new_children);
drop(tables);
}
}
});
Self { _task: task, tx }
}
}

tracing::trace!("Computations completed");
hat_mut!(tables).linkstatepeers_trees_task = None;
},
TerminatableTask::create_cancellation_token(),
);
self.linkstatepeers_trees_task = Some(task);
struct HatTables {
linkstatepeer_subs: HashSet<Arc<Resource>>,
linkstatepeer_tokens: HashSet<Arc<Resource>>,
linkstatepeer_qabls: HashSet<Arc<Resource>>,
linkstatepeers_net: Option<Network>,
linkstatepeers_trees_worker: TreesComputationWorker,
}

impl HatTables {
fn new() -> Self {
Self {
linkstatepeer_subs: HashSet::new(),
linkstatepeer_tokens: HashSet::new(),
linkstatepeer_qabls: HashSet::new(),
linkstatepeers_net: None,
linkstatepeers_trees_worker: TreesComputationWorker::new(),
}
}

fn schedule_compute_trees(&mut self, tables_ref: Arc<TablesLock>) {
tracing::trace!("Schedule trees computation");
let _ = self.linkstatepeers_trees_worker.tx.try_send(tables_ref);
}
}

pub(crate) struct HatCode {}
Expand Down
115 changes: 56 additions & 59 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::{
collections::{hash_map::DefaultHasher, HashMap, HashSet},
hash::Hasher,
sync::{atomic::AtomicU32, Arc},
time::Duration,
};

use token::{token_linkstate_change, token_remove_node, undeclare_simple_token};
Expand Down Expand Up @@ -117,6 +116,49 @@ macro_rules! face_hat_mut {
}
use face_hat_mut;

struct TreesComputationWorker {
_task: TerminatableTask,
tx: flume::Sender<Arc<TablesLock>>,
}

impl TreesComputationWorker {
fn new(net_type: WhatAmI) -> Self {
let (tx, rx) = flume::bounded::<Arc<TablesLock>>(1);
let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move {
loop {
tokio::time::sleep(std::time::Duration::from_millis(
*TREES_COMPUTATION_DELAY_MS,
))
.await;
if let Ok(tables_ref) = rx.recv_async().await {
let mut tables = zwrite!(tables_ref.tables);

tracing::trace!("Compute trees");
let new_children = match net_type {
WhatAmI::Router => hat_mut!(tables)
.routers_net
.as_mut()
.unwrap()
.compute_trees(),
_ => hat_mut!(tables)
.linkstatepeers_net
.as_mut()
.unwrap()
.compute_trees(),
};

tracing::trace!("Compute routes");
pubsub::pubsub_tree_change(&mut tables, &new_children, net_type);
queries::queries_tree_change(&mut tables, &new_children, net_type);
token::token_tree_change(&mut tables, &new_children, net_type);
drop(tables);
}
}
});
Self { _task: task, tx }
}
}

struct HatTables {
router_subs: HashSet<Arc<Resource>>,
linkstatepeer_subs: HashSet<Arc<Resource>>,
Expand All @@ -127,22 +169,11 @@ struct HatTables {
routers_net: Option<Network>,
linkstatepeers_net: Option<Network>,
shared_nodes: Vec<ZenohIdProto>,
routers_trees_task: Option<TerminatableTask>,
linkstatepeers_trees_task: Option<TerminatableTask>,
routers_trees_worker: TreesComputationWorker,
linkstatepeers_trees_worker: TreesComputationWorker,
router_peers_failover_brokering: bool,
}

impl Drop for HatTables {
fn drop(&mut self) {
if let Some(mut task) = self.linkstatepeers_trees_task.take() {
task.terminate(Duration::from_secs(10));
}
if let Some(mut task) = self.routers_trees_task.take() {
task.terminate(Duration::from_secs(10));
}
}
}

impl HatTables {
fn new(router_peers_failover_brokering: bool) -> Self {
Self {
Expand All @@ -155,8 +186,8 @@ impl HatTables {
routers_net: None,
linkstatepeers_net: None,
shared_nodes: vec![],
routers_trees_task: None,
linkstatepeers_trees_task: None,
routers_trees_worker: TreesComputationWorker::new(WhatAmI::Router),
linkstatepeers_trees_worker: TreesComputationWorker::new(WhatAmI::Peer),
router_peers_failover_brokering,
}
}
Expand Down Expand Up @@ -259,49 +290,15 @@ impl HatTables {
}

fn schedule_compute_trees(&mut self, tables_ref: Arc<TablesLock>, net_type: WhatAmI) {
if (net_type == WhatAmI::Router && self.routers_trees_task.is_none())
|| (net_type == WhatAmI::Peer && self.linkstatepeers_trees_task.is_none())
{
let task = TerminatableTask::spawn(
zenoh_runtime::ZRuntime::Net,
async move {
tokio::time::sleep(std::time::Duration::from_millis(
*TREES_COMPUTATION_DELAY_MS,
))
.await;
let mut tables = zwrite!(tables_ref.tables);

tracing::trace!("Compute trees");
let new_children = match net_type {
WhatAmI::Router => hat_mut!(tables)
.routers_net
.as_mut()
.unwrap()
.compute_trees(),
_ => hat_mut!(tables)
.linkstatepeers_net
.as_mut()
.unwrap()
.compute_trees(),
};

tracing::trace!("Compute routes");
pubsub::pubsub_tree_change(&mut tables, &new_children, net_type);
queries::queries_tree_change(&mut tables, &new_children, net_type);
token::token_tree_change(&mut tables, &new_children, net_type);

tracing::trace!("Computations completed");
match net_type {
WhatAmI::Router => hat_mut!(tables).routers_trees_task = None,
_ => hat_mut!(tables).linkstatepeers_trees_task = None,
};
},
TerminatableTask::create_cancellation_token(),
);
match net_type {
WhatAmI::Router => self.routers_trees_task = Some(task),
_ => self.linkstatepeers_trees_task = Some(task),
};
tracing::trace!("Schedule trees computation");
match net_type {
WhatAmI::Router => {
let _ = self.routers_trees_worker.tx.try_send(tables_ref);
}
WhatAmI::Peer => {
let _ = self.linkstatepeers_trees_worker.tx.try_send(tables_ref);
}
_ => (),
}
}
}
Expand Down

0 comments on commit ca9cce0

Please sign in to comment.