Skip to content

Commit

Permalink
Start link tx_task before notifying router
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Jun 7, 2024
1 parent c279982 commit a0ce56e
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 11 deletions.
6 changes: 4 additions & 2 deletions io/zenoh-transport/src/unicast/lowlatency/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,19 @@ impl TransportUnicastTrait for TransportUnicastLowlatency {
drop(guard);

// create a callback to start the link
let start_link = Box::new(move || {
let start_tx = Box::new(move || {
// start keepalive task
let keep_alive =
self.manager.config.unicast.lease / self.manager.config.unicast.keep_alive as u32;
self.start_keepalive(keep_alive);
});

let start_rx = Box::new(move || {
// start RX task
self.internal_start_rx(other_lease);
});

Ok((start_link, ack))
Ok((start_tx, start_rx, ack))
}

/*************************************/
Expand Down
13 changes: 9 additions & 4 deletions io/zenoh-transport/src/unicast/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ impl TransportManager {
}

// Add the link to the transport
let (start_tx_rx, ack) = transport
let (start_tx, start_rx, ack) = transport
.add_link(link, other_initial_sn, other_lease)
.await
.map_err(InitTransportError::Link)?;
Expand All @@ -456,10 +456,12 @@ impl TransportManager {
.await
.map_err(|e| InitTransportError::Transport((e, c_t, close::reason::GENERIC)))?;

start_tx();

// notify transport's callback interface that there is a new link
Self::notify_new_link_unicast(&transport, c_link);

start_tx_rx();
start_rx();

Ok(transport)
}
Expand Down Expand Up @@ -548,7 +550,8 @@ impl TransportManager {
};

// Add the link to the transport
let (start_tx_rx, ack) = match t.add_link(link, other_initial_sn, other_lease).await {
let (start_tx, start_rx, ack) = match t.add_link(link, other_initial_sn, other_lease).await
{
Ok(val) => val,
Err(e) => {
let _ = t.close(e.2).await;
Expand All @@ -575,6 +578,8 @@ impl TransportManager {
guard.insert(config.zid, t.clone());
drop(guard);

start_tx();

// Notify manager's interface that there is a new transport
transport_error!(
self.notify_new_transport_unicast(&t),
Expand All @@ -584,7 +589,7 @@ impl TransportManager {
// Notify transport's callback interface that there is a new link
Self::notify_new_link_unicast(&t, c_link);

start_tx_rx();
start_rx();

zcondfeat!(
"shared-memory",
Expand Down
10 changes: 8 additions & 2 deletions io/zenoh-transport/src/unicast/transport_unicast_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ pub(crate) enum InitTransportError {
Transport(TransportError),
}

pub(crate) type AddLinkResult<'a> =
Result<(Box<dyn FnOnce() + Send + Sync + 'a>, MaybeOpenAck), LinkError>;
pub(crate) type AddLinkResult<'a> = Result<
(
Box<dyn FnOnce() + Send + Sync + 'a>,
Box<dyn FnOnce() + Send + Sync + 'a>,
MaybeOpenAck,
),
LinkError,
>;
pub(crate) type InitTransportResult = Result<Arc<dyn TransportUnicastTrait>, InitTransportError>;

/*************************************/
Expand Down
10 changes: 7 additions & 3 deletions io/zenoh-transport/src/unicast/universal/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,21 @@ impl TransportUnicastTrait for TransportUnicastUniversal {

// create a callback to start the link
let transport = self.clone();
let start_link = Box::new(move || {
let mut c_link = link.clone();
let c_transport = transport.clone();
let start_tx = Box::new(move || {
// Start the TX loop
let keep_alive =
self.manager.config.unicast.lease / self.manager.config.unicast.keep_alive as u32;
link.start_tx(transport.clone(), consumer, keep_alive);
c_link.start_tx(c_transport, consumer, keep_alive);
});

let start_rx = Box::new(move || {
// Start the RX loop
link.start_rx(transport, other_lease);
});

Ok((start_link, ack))
Ok((start_tx, start_rx, ack))
}

/*************************************/
Expand Down

0 comments on commit a0ce56e

Please sign in to comment.