Skip to content

Commit

Permalink
[Core] Enable concurrent connections between nodes
Browse files Browse the repository at this point in the history
This enables nodes to maintain concurrent connections across different actual TCP connections to increase message processing concurrency. This is controlled by a new configuration option in `[networking]` section.

This also tags a few operations with `tokio::task::unconstrained` to reduce unnecessary coop-driven yields that happen at some hot-paths.

```
// intentionally empty
```
  • Loading branch information
AhmedSoliman committed Feb 7, 2025
1 parent 3bb6bd1 commit b86c969
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 28 deletions.
5 changes: 3 additions & 2 deletions crates/core/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,9 @@ impl Metadata {
min_version: Version,
) -> Result<Version, ShutdownError> {
let mut recv = self.inner.write_watches[metadata_kind].receive.clone();
let v = recv
.wait_for(|v| *v >= min_version)
// If we are already at the metadata version, avoid tokio's yielding to
// improve tail latencies when this is used in latency-sensitive operations.
let v = tokio::task::unconstrained(recv.wait_for(|v| *v >= min_version))
.await
.map_err(|_| ShutdownError)?;
Ok(*v)
Expand Down
29 changes: 23 additions & 6 deletions crates/core/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,19 @@ impl ConnectionManagerInner {
fn get_random_connection(
&self,
peer_node_id: &GenerationalNodeId,
target_concurrency: usize,
) -> Option<Arc<OwnedConnection>> {
use rand::prelude::IndexedRandom;
self.connection_by_gen_id
.get(peer_node_id)
.and_then(|connections| connections.choose(&mut rand::rng())?.upgrade())
.and_then(|connections| {
// Suggest we create new connection if the number
// of connections is below the target
if connections.len() < target_concurrency {
return None;
}
connections.choose(&mut rand::rng())?.upgrade()
})
}
}

Expand Down Expand Up @@ -294,10 +302,18 @@ impl<T: TransportConnect> ConnectionManager<T> {
&self,
node_id: GenerationalNodeId,
) -> Result<Arc<OwnedConnection>, NetworkError> {
// fail fast if we are connecting to our previous self
if self.metadata.my_node_id().is_same_but_different(&node_id) {
return Err(NetworkError::NodeIsGone(node_id));
}

// find a connection by node_id
let maybe_connection: Option<Arc<OwnedConnection>> = {
let guard = self.inner.lock();
guard.get_random_connection(&node_id)
guard.get_random_connection(
&node_id,
self.networking_options.num_concurrent_connections(),
)
// lock is dropped.
};

Expand Down Expand Up @@ -669,8 +685,8 @@ where
global::get_text_map_propagator(|propagator| propagator.extract(span_ctx))
});

if let Err(e) = router
.call(
if let Err(e) = tokio::task::unconstrained(
router.call(
Incoming::from_parts(
msg,
connection.downgrade(),
Expand All @@ -680,8 +696,9 @@ where
)
.with_parent_context(parent_context),
connection.protocol_version,
)
.await
),
)
.await
{
warn!("Error processing message: {:?}", e);
}
Expand Down
20 changes: 2 additions & 18 deletions crates/core/src/network/transport_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
use std::future::Future;

use futures::{Stream, StreamExt};
use tonic::transport::Channel;
use tracing::trace;

use restate_types::config::NetworkingOptions;
use restate_types::net::AdvertisedAddress;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::protobuf::node::Message;
use restate_types::GenerationalNodeId;
Expand All @@ -24,8 +22,6 @@ use super::protobuf::core_node_svc::core_node_svc_client::CoreNodeSvcClient;
use super::{NetworkError, ProtocolError};
use crate::network::net_util::create_tonic_channel;

type DashMap<K, V> = dashmap::DashMap<K, V, ahash::RandomState>;

pub trait TransportConnect: Send + Sync + 'static {
fn connect(
&self,
Expand All @@ -42,15 +38,11 @@ pub trait TransportConnect: Send + Sync + 'static {

pub struct GrpcConnector {
networking_options: NetworkingOptions,
channel_cache: DashMap<AdvertisedAddress, Channel>,
}

impl GrpcConnector {
pub fn new(networking_options: NetworkingOptions) -> Self {
Self {
networking_options,
channel_cache: DashMap::default(),
}
Self { networking_options }
}
}

Expand All @@ -67,15 +59,7 @@ impl TransportConnect for GrpcConnector {
let address = nodes_config.find_node_by_id(node_id)?.address.clone();

trace!("Attempting to connect to node {} at {}", node_id, address);
// Do we have a channel in cache for this address?
let channel = match self.channel_cache.get(&address) {
Some(channel) => channel.clone(),
None => self
.channel_cache
.entry(address.clone())
.or_insert_with(|| create_tonic_channel(address, &self.networking_options))
.clone(),
};
let channel = create_tonic_channel(address, &self.networking_options);

// Establish the connection
let mut client = CoreNodeSvcClient::new(channel)
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/network/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ impl<M: Targeted + WireEncode> Outgoing<M, HasConnection> {
let connection = bail_on_error!(self, self.try_upgrade());
let permit = bail_on_none!(
self,
connection.reserve().await,
tokio::task::unconstrained(connection.reserve()).await,
NetworkError::ConnectionClosed(connection.peer())
);

Expand All @@ -438,7 +438,7 @@ impl<M: Targeted + WireEncode> Outgoing<M, HasConnection> {
pub async fn send_timeout(self, timeout: Duration) -> Result<(), NetworkSendError<Self>> {
let send_start = Instant::now();
let connection = bail_on_error!(self, self.try_upgrade());
let permit = match connection.reserve_timeout(timeout).await {
let permit = match tokio::task::unconstrained(connection.reserve_timeout(timeout)).await {
Ok(permit) => permit,
Err(e) => return Err(NetworkSendError::new(self, e)),
};
Expand Down
15 changes: 15 additions & 0 deletions crates/types/src/config/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,20 @@ pub struct NetworkingOptions {
/// The number of messages that can be queued on the outbound stream of a single
/// connection.
pub outbound_queue_length: NonZeroUsize,

/// # Number of connections between peers
///
/// This is used as a guiding value for how many connections every node can
/// maintain with its peers. With more connections, concurrency of network message
/// processing increases, but it also increases the memory and CPU overhead.
pub num_concurrent_connections: NonZeroUsize,
}

impl NetworkingOptions {
#[inline(always)]
pub fn num_concurrent_connections(&self) -> usize {
self.num_concurrent_connections.get()
}
}

impl Default for NetworkingOptions {
Expand All @@ -80,6 +94,7 @@ impl Default for NetworkingOptions {
http2_keep_alive_interval: Duration::from_secs(5).into(),
http2_keep_alive_timeout: Duration::from_secs(5).into(),
http2_adaptive_window: true,
num_concurrent_connections: NonZeroUsize::new(13).unwrap(),
}
}
}
6 changes: 6 additions & 0 deletions crates/types/src/node_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ impl GenerationalNodeId {
self.encode(buf);
buf.split()
}

/// Same plain node-id but not the same generation
#[inline(always)]
pub fn is_same_but_different(&self, other: &GenerationalNodeId) -> bool {
self.0 == other.0 && self.1 != other.1
}
}

impl From<GenerationalNodeId> for u64 {
Expand Down

0 comments on commit b86c969

Please sign in to comment.