diff --git a/crates/core/src/metadata.rs b/crates/core/src/metadata.rs index 08fde62e1..8fccd07c8 100644 --- a/crates/core/src/metadata.rs +++ b/crates/core/src/metadata.rs @@ -216,8 +216,9 @@ impl Metadata { min_version: Version, ) -> Result { let mut recv = self.inner.write_watches[metadata_kind].receive.clone(); - let v = recv - .wait_for(|v| *v >= min_version) + // If we are already at the metadata version, avoid tokio's yielding to + // improve tail latencies when this is used in latency-sensitive operations. + let v = tokio::task::unconstrained(recv.wait_for(|v| *v >= min_version)) .await .map_err(|_| ShutdownError)?; Ok(*v) diff --git a/crates/core/src/network/connection_manager.rs b/crates/core/src/network/connection_manager.rs index 0711d6363..b3d68ba80 100644 --- a/crates/core/src/network/connection_manager.rs +++ b/crates/core/src/network/connection_manager.rs @@ -99,11 +99,19 @@ impl ConnectionManagerInner { fn get_random_connection( &self, peer_node_id: &GenerationalNodeId, + target_concurrency: usize, ) -> Option> { use rand::prelude::IndexedRandom; self.connection_by_gen_id .get(peer_node_id) - .and_then(|connections| connections.choose(&mut rand::rng())?.upgrade()) + .and_then(|connections| { + // Suggest we create new connection if the number + // of connections is below the target + if connections.len() < target_concurrency { + return None; + } + connections.choose(&mut rand::rng())?.upgrade() + }) } } @@ -294,10 +302,18 @@ impl ConnectionManager { &self, node_id: GenerationalNodeId, ) -> Result, NetworkError> { + // fail fast if we are connecting to our previous self + if self.metadata.my_node_id().is_same_but_different(&node_id) { + return Err(NetworkError::NodeIsGone(node_id)); + } + // find a connection by node_id let maybe_connection: Option> = { let guard = self.inner.lock(); - guard.get_random_connection(&node_id) + guard.get_random_connection( + &node_id, + self.networking_options.num_concurrent_connections(), + ) // lock is dropped. }; @@ -669,8 +685,8 @@ where global::get_text_map_propagator(|propagator| propagator.extract(span_ctx)) }); - if let Err(e) = router - .call( + if let Err(e) = tokio::task::unconstrained( + router.call( Incoming::from_parts( msg, connection.downgrade(), @@ -680,8 +696,9 @@ where ) .with_parent_context(parent_context), connection.protocol_version, - ) - .await + ), + ) + .await { warn!("Error processing message: {:?}", e); } diff --git a/crates/core/src/network/transport_connector.rs b/crates/core/src/network/transport_connector.rs index 953c9386d..085305176 100644 --- a/crates/core/src/network/transport_connector.rs +++ b/crates/core/src/network/transport_connector.rs @@ -11,11 +11,9 @@ use std::future::Future; use futures::{Stream, StreamExt}; -use tonic::transport::Channel; use tracing::trace; use restate_types::config::NetworkingOptions; -use restate_types::net::AdvertisedAddress; use restate_types::nodes_config::NodesConfiguration; use restate_types::protobuf::node::Message; use restate_types::GenerationalNodeId; @@ -24,8 +22,6 @@ use super::protobuf::core_node_svc::core_node_svc_client::CoreNodeSvcClient; use super::{NetworkError, ProtocolError}; use crate::network::net_util::create_tonic_channel; -type DashMap = dashmap::DashMap; - pub trait TransportConnect: Send + Sync + 'static { fn connect( &self, @@ -42,15 +38,11 @@ pub trait TransportConnect: Send + Sync + 'static { pub struct GrpcConnector { networking_options: NetworkingOptions, - channel_cache: DashMap, } impl GrpcConnector { pub fn new(networking_options: NetworkingOptions) -> Self { - Self { - networking_options, - channel_cache: DashMap::default(), - } + Self { networking_options } } } @@ -67,15 +59,7 @@ impl TransportConnect for GrpcConnector { let address = nodes_config.find_node_by_id(node_id)?.address.clone(); trace!("Attempting to connect to node {} at {}", node_id, address); - // Do we have a channel in cache for this address? - let channel = match self.channel_cache.get(&address) { - Some(channel) => channel.clone(), - None => self - .channel_cache - .entry(address.clone()) - .or_insert_with(|| create_tonic_channel(address, &self.networking_options)) - .clone(), - }; + let channel = create_tonic_channel(address, &self.networking_options); // Establish the connection let mut client = CoreNodeSvcClient::new(channel) diff --git a/crates/core/src/network/types.rs b/crates/core/src/network/types.rs index a0a9f7870..cc01e8a4b 100644 --- a/crates/core/src/network/types.rs +++ b/crates/core/src/network/types.rs @@ -421,7 +421,7 @@ impl Outgoing { let connection = bail_on_error!(self, self.try_upgrade()); let permit = bail_on_none!( self, - connection.reserve().await, + tokio::task::unconstrained(connection.reserve()).await, NetworkError::ConnectionClosed(connection.peer()) ); @@ -438,7 +438,7 @@ impl Outgoing { pub async fn send_timeout(self, timeout: Duration) -> Result<(), NetworkSendError> { let send_start = Instant::now(); let connection = bail_on_error!(self, self.try_upgrade()); - let permit = match connection.reserve_timeout(timeout).await { + let permit = match tokio::task::unconstrained(connection.reserve_timeout(timeout)).await { Ok(permit) => permit, Err(e) => return Err(NetworkSendError::new(self, e)), }; diff --git a/crates/types/src/config/networking.rs b/crates/types/src/config/networking.rs index 13a76859e..47b9d583a 100644 --- a/crates/types/src/config/networking.rs +++ b/crates/types/src/config/networking.rs @@ -63,6 +63,20 @@ pub struct NetworkingOptions { /// The number of messages that can be queued on the outbound stream of a single /// connection. pub outbound_queue_length: NonZeroUsize, + + /// # Number of connections between peers + /// + /// This is used as a guiding value for how many connections every node can + /// maintain with its peers. With more connections, concurrency of network message + /// processing increases, but it also increases the memory and CPU overhead. + pub num_concurrent_connections: NonZeroUsize, +} + +impl NetworkingOptions { + #[inline(always)] + pub fn num_concurrent_connections(&self) -> usize { + self.num_concurrent_connections.get() + } } impl Default for NetworkingOptions { @@ -80,6 +94,7 @@ impl Default for NetworkingOptions { http2_keep_alive_interval: Duration::from_secs(5).into(), http2_keep_alive_timeout: Duration::from_secs(5).into(), http2_adaptive_window: true, + num_concurrent_connections: NonZeroUsize::new(13).unwrap(), } } } diff --git a/crates/types/src/node_id.rs b/crates/types/src/node_id.rs index f2fe02891..1646ab5af 100644 --- a/crates/types/src/node_id.rs +++ b/crates/types/src/node_id.rs @@ -106,6 +106,12 @@ impl GenerationalNodeId { self.encode(buf); buf.split() } + + /// Same plain node-id but not the same generation + #[inline(always)] + pub fn is_same_but_different(&self, other: &GenerationalNodeId) -> bool { + self.0 == other.0 && self.1 != other.1 + } } impl From for u64 {