Skip to content

Commit

Permalink
Merge branch 'rumenov/adtrdsah' into 'master'
Browse files Browse the repository at this point in the history
fix: improve the error space and add the possibility of graceful shutdown

I am thinking that we should have chain graceful shutdown of all components.

More specifically, I would imagine that the replica shutdowns the transport endpoint and then all transport and p2p event loops are able to exit gracefully.

I also added some reasoning about the design of the transport. 

See merge request dfinity-lab/public/ic!16316
  • Loading branch information
rumenov committed Dec 27, 2023
2 parents 066f10b + f9928ec commit ac1a11d
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 92 deletions.
6 changes: 1 addition & 5 deletions rs/p2p/consensus_manager/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,11 +536,7 @@ mod tests {
mock_transport
.expect_push()
.times(5)
.returning(move |_, _| {
Err(SendError::ConnectionNotFound {
reason: String::new(),
})
})
.returning(move |_, _| Err(SendError::ConnectionUnavailable(String::new())))
.in_sequence(&mut seq);
mock_transport
.expect_push()
Expand Down
16 changes: 9 additions & 7 deletions rs/p2p/memory_transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ impl Transport for PeerTransport {
mut request: Request<Bytes>,
) -> Result<Response<Bytes>, SendError> {
if peer_id == &self.node_id {
panic!("Should not happen");
return Err(SendError::ConnectionUnavailable(
"Can't connect to self".to_string(),
));
}

let (oneshot_tx, oneshot_rx) = oneshot::channel();
Expand All @@ -292,15 +294,15 @@ impl Transport for PeerTransport {
.send((request, *peer_id, oneshot_tx))
.is_err()
{
return Err(SendError::SendRequestFailed {
reason: String::from("router channel closed"),
});
return Err(SendError::ConnectionUnavailable(String::from(
"router channel closed",
)));
}
match oneshot_rx.await {
Ok(r) => Ok(r),
Err(_) => Err(SendError::RecvResponseFailed {
reason: "channel closed".to_owned(),
}),
Err(_) => Err(SendError::ConnectionUnavailable(String::from(
"channel closed",
))),
}
}

Expand Down
19 changes: 19 additions & 0 deletions rs/p2p/quic_transport/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,22 @@ A nice writeup about async and blocking operations can be found in https://ryhl.
1. Use QUIC to statisfy the first two requirements ("Reliable data delivery" and "Multiplexing").
2. Open new QUIC stream for each message, instead of one stream per handler/URI. TODO: explain.
3. Handlers are always ready to process messages, hence the router is always ready to accept streams. TODO: explain.

=== Breaking dependency cycles in P2P protocols ===

In any design that uses a single connection for inbound and outbound traffic with
designated listner and dialer there exists a circlular dependency between the read and write paths.
There are different approaches of breaking this depedency.

The IC P2P protocols have a single event loop per P2P protocol that drives the outbound traffic.
This follows the libp2p philosophy.
Hence only those event loops need access to the `QuicTransport` object.
In this model handlers can have a channel to the main event loop.

Alternative approach (the one that SUI takes) is to have weak references
to the transport object that can be used directly in the handlers. As a result,
when there are handlers that take the weak reference the transport object needs to first be instantiated
and later started with the already constructed router.
This approach enables the handlers for doing most of the work and potentially eliminating the need
of the event loop from the first approach.
However, this comes at the cost of having more shared state and more contention.
36 changes: 14 additions & 22 deletions rs/p2p/quic_transport/src/connection_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,41 +66,37 @@ impl ConnectionHandle {
// Propagate PeerId from this connection to lower layers.
request.extensions_mut().insert(self.peer_id);

let (mut send_stream, recv_stream) = self.connection.open_bi().await.map_err(|e| {
let (mut send_stream, recv_stream) = self.connection.open_bi().await.map_err(|err| {
self.metrics
.connection_handle_errors_total
.with_label_values(&[REQUEST_TYPE_RPC, ERROR_TYPE_OPEN]);
SendError::SendRequestFailed {
reason: e.to_string(),
}
err
})?;

write_request(&mut send_stream, request)
.await
.map_err(|e| {
.map_err(|err| {
self.metrics
.connection_handle_errors_total
.with_label_values(&[REQUEST_TYPE_RPC, ERROR_TYPE_WRITE])
.inc();
SendError::SendRequestFailed { reason: e }
err
})?;

send_stream.finish().await.map_err(|e| {
send_stream.finish().await.map_err(|err| {
self.metrics
.connection_handle_errors_total
.with_label_values(&[REQUEST_TYPE_RPC, ERROR_TYPE_FINISH])
.inc();
SendError::SendRequestFailed {
reason: e.to_string(),
}
err
})?;

let mut response = read_response(recv_stream).await.map_err(|e| {
let mut response = read_response(recv_stream).await.map_err(|err| {
self.metrics
.connection_handle_errors_total
.with_label_values(&[REQUEST_TYPE_RPC, ERROR_TYPE_READ])
.inc();
SendError::RecvResponseFailed { reason: e }
err
})?;

// Propagate PeerId from this request to upper layers.
Expand All @@ -124,33 +120,29 @@ impl ConnectionHandle {
// Propagate PeerId from this connection to lower layers.
request.extensions_mut().insert(self.peer_id);

let mut send_stream = self.connection.open_uni().await.map_err(|e| {
let mut send_stream = self.connection.open_uni().await.map_err(|err| {
self.metrics
.connection_handle_errors_total
.with_label_values(&[REQUEST_TYPE_PUSH, ERROR_TYPE_OPEN]);
SendError::SendRequestFailed {
reason: e.to_string(),
}
err
})?;

write_request(&mut send_stream, request)
.await
.map_err(|e| {
.map_err(|err| {
self.metrics
.connection_handle_errors_total
.with_label_values(&[REQUEST_TYPE_PUSH, ERROR_TYPE_WRITE])
.inc();
SendError::SendRequestFailed { reason: e }
err
})?;

send_stream.finish().await.map_err(|e| {
send_stream.finish().await.map_err(|err| {
self.metrics
.connection_handle_errors_total
.with_label_values(&[REQUEST_TYPE_PUSH, ERROR_TYPE_FINISH])
.inc();
SendError::SendRequestFailed {
reason: e.to_string(),
}
err
})?;

Ok(())
Expand Down
71 changes: 58 additions & 13 deletions rs/p2p/quic_transport/src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ use quinn::{
EndpointConfig, RecvStream, SendStream, VarInt,
};
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use thiserror::Error;
use tokio::{
io::{AsyncRead, AsyncWrite},
runtime::Handle,
select,
sync::mpsc::{channel, Receiver, Sender},
sync::oneshot,
task::JoinSet,
};
use tokio_util::time::DelayQueue;
Expand Down Expand Up @@ -122,6 +125,8 @@ struct ConnectionManager {
/// JoinMap that stores active connection handlers keyed by peer id.
active_connections: JoinMap<NodeId, ()>,

/// The channel is used for graceful shutdown of the connection manager.
shutdown_rx: Receiver<oneshot::Sender<()>>,
/// Endpoint config
endpoint: Endpoint,
transport_config: Arc<quinn::TransportConfig>,
Expand Down Expand Up @@ -188,6 +193,24 @@ impl std::fmt::Display for ConnectionEstablishError {
}
}

#[derive(Error, Debug)]
#[error("ShutdownError")]
pub(crate) struct ShutdownError;
pub(crate) struct ShutdownHandle(Sender<oneshot::Sender<()>>);

impl ShutdownHandle {
fn new() -> (Self, Receiver<oneshot::Sender<()>>) {
let (shutdown_tx, shutdown_rx) = channel(10);
(ShutdownHandle(shutdown_tx), shutdown_rx)
}

pub(crate) async fn shutdown(&mut self) -> Result<(), ShutdownError> {
let (wait_tx, wait_rx) = oneshot::channel();
self.0.send(wait_tx).await.map_err(|_| ShutdownError)?;
wait_rx.await.map_err(|_| ShutdownError)
}
}

pub(crate) fn start_connection_manager(
log: &ReplicaLogger,
metrics_registry: &MetricsRegistry,
Expand All @@ -200,7 +223,7 @@ pub(crate) fn start_connection_manager(
watcher: tokio::sync::watch::Receiver<SubnetTopology>,
socket: Either<SocketAddr, impl AsyncUdpSocket>,
router: Router,
) {
) -> ShutdownHandle {
let topology = watcher.borrow().clone();

let metrics = QuicTransportMetrics::new(metrics_registry);
Expand Down Expand Up @@ -294,6 +317,8 @@ pub(crate) fn start_connection_manager(
.expect("Failed to create endpoint"),
};

let (shutdown_handler, shutdown_rx) = ShutdownHandle::new();

let manager = ConnectionManager {
log: log.clone(),
rt: rt.clone(),
Expand All @@ -311,10 +336,12 @@ pub(crate) fn start_connection_manager(
outbound_connecting: JoinMap::new(),
inbound_connecting: JoinSet::new(),
active_connections: JoinMap::new(),
shutdown_rx,
router,
};

rt.spawn(manager.run());
shutdown_handler
}

impl ConnectionManager {
Expand All @@ -323,8 +350,11 @@ impl ConnectionManager {
}

pub async fn run(mut self) {
loop {
let shutdown_notifier = loop {
select! {
shutdown_notifier = self.shutdown_rx.recv() => {
break shutdown_notifier;
}
Some(reconnect) = self.connect_queue.next() => {
self.handle_dial(reconnect.into_inner())
}
Expand All @@ -334,18 +364,23 @@ impl ConnectionManager {
self.handle_topology_change();
},
Err(_) => {
error!(self.log, "Transport disconnected from peer manager. Shutting down.");
break
// If this happens it means that peer discovery is not working anymore.
// There are few options in this case
// 1. continue working with the existing set of connections (preferred)
// 2. panic
// 3. do a graceful shutdown and rely on clients of transport to handle this fallout
// (least preferred because this is not recoverable error)
error!(self.log, "Transport disconnected from peer manager.");
}
}
},
connecting = self.endpoint.accept() => {
if let Some(connecting) = connecting {
self.handle_inbound(connecting);
} else {
info!(self.log, "Quic endpoint closed. Stopping transport.");
// Endpoint is closed. This indicates a shutdown.
break;
error!(self.log, "Quic endpoint closed. Stopping transport.");
// Endpoint is closed. This indicates NOT graceful shutdown.
break None;
}
},
Some(conn_res) = self.outbound_connecting.join_next() => {
Expand Down Expand Up @@ -392,13 +427,23 @@ impl ConnectionManager {
self.metrics
.delay_queue_size
.set(self.connect_queue.len() as i64);
}
// This point is reached only in two cases - replica gracefully shutting down or
// bug which makes the peer manager unavailable.
// If the peer manager is unavailable, the replica needs must exist that's why
// the endpoint is closed proactively.
self.endpoint.close(0u8.into(), b"shutting down");
};
self.shutdown().await;
// The send can fail iff the shutdown handler was dropped already.
let _ = shutdown_notifier
.expect("Transport 't stop unexpectedly. This is serious unrecoverable bug. It is better to crash.")
.send(());
}

// TODO: maybe unbind the port so we can start another transport on the same port after shutdown.
async fn shutdown(mut self) {
self.peer_map.write().unwrap().clear();
self.endpoint
.close(VarInt::from_u32(0), b"graceful shutdown of endpoint");
self.connect_queue.clear();
self.inbound_connecting.shutdown().await;
self.outbound_connecting.shutdown().await;
self.active_connections.shutdown().await;
self.endpoint.wait_idle().await;
}

Expand Down
Loading

0 comments on commit ac1a11d

Please sign in to comment.