diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index e5956278663..0520f08c04f 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -27,6 +27,10 @@ webpki = "0.22.0" x509-parser = "0.12.0" yasna = { version = "0.4.0" } +async-std = "*" +log = "*" +futures-timer = "*" + [dev-dependencies] anyhow = "1.0.41" async-std = { version = "1.9.0", features = ["attributes"] } diff --git a/transports/quic/src/connection.rs b/transports/quic/src/connection.rs new file mode 100644 index 00000000000..f65ca718af0 --- /dev/null +++ b/transports/quic/src/connection.rs @@ -0,0 +1,435 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A single QUIC connection. +//! +//! The [`Connection`] struct of this module contains, amongst other things, a +//! [`quinn_proto::Connection`] state machine and an `Arc`. This struct is responsible +//! for communication between quinn_proto's connection and its associated endpoint. +//! All interactions with a QUIC connection should be done through this struct. +// TODO: docs + +use crate::endpoint::Endpoint; + +use futures::{channel::mpsc, prelude::*}; +use std::{ + fmt, + net::SocketAddr, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Instant, +}; + +/// Underlying structure for both [`crate::QuicMuxer`] and [`crate::Upgrade`]. +/// +/// Contains everything needed to process a connection with a remote. +/// Tied to a specific [`crate::Endpoint`]. +pub(crate) struct Connection { + /// Endpoint this connection belongs to. + endpoint: Arc, + /// Future whose job is to send a message to the endpoint. Only one at a time. + pending_to_endpoint: Option + Send + Sync>>>, + /// Events that the endpoint will send in destination to our local [`quinn_proto::Connection`]. + /// Passed at initialization. + from_endpoint: mpsc::Receiver, + + /// The QUIC state machine for this specific connection. + connection: quinn_proto::Connection, + /// Identifier for this connection according to the endpoint. Used when sending messages to + /// the endpoint. + connection_id: quinn_proto::ConnectionHandle, + /// `Future` that triggers at the `Instant` that `self.connection.poll_timeout()` indicates. + next_timeout: Option, + + /// In other to avoid race conditions where a "connected" event happens if we were not + /// handshaking, we cache whether the connection is handshaking and only set this to true + /// after a "connected" event has been received. + /// + /// In other words, this flag indicates whether a "connected" hasn't been received yet. + is_handshaking: bool, + /// Contains a `Some` if the connection is closed, with the reason of the closure. + /// Contains `None` if it is still open. + /// Contains `Some` if and only if a `ConnectionLost` event has been emitted. + closed: Option, +} + +/// Error on the connection as a whole. +#[derive(Debug, Clone, thiserror::Error)] +pub enum Error { + /// Endpoint has force-killed this connection because it was too busy. + #[error("Endpoint has force-killed our connection")] + ClosedChannel, + /// Error in the inner state machine. + #[error("{0}")] + Quinn(#[from] quinn_proto::ConnectionError), +} + +impl Connection { + /// Crate-internal function that builds a [`Connection`] from raw components. + /// + /// This function assumes that there exists a background task that will process the messages + /// sent to `to_endpoint` and send us messages on `from_endpoint`. + /// + /// The `from_endpoint` can be purposefully closed by the endpoint if the connection is too + /// slow to process. + // TODO: is this necessary ^? figure out if quinn_proto doesn't forbid that situation in the first place + /// + /// `connection_id` is used to identify the local connection in the messages sent to + /// `to_endpoint`. + /// + /// This function assumes that the [`quinn_proto::Connection`] is completely fresh and none of + /// its methods has ever been called. Failure to comply might lead to logic errors and panics. + // TODO: maybe abstract `to_endpoint` more and make it generic? dunno + pub(crate) fn from_quinn_connection( + endpoint: Arc, + connection: quinn_proto::Connection, + connection_id: quinn_proto::ConnectionHandle, + from_endpoint: mpsc::Receiver, + ) -> Self { + assert!(!connection.is_closed()); + let is_handshaking = connection.is_handshaking(); + + Connection { + endpoint, + pending_to_endpoint: None, + connection, + next_timeout: None, + from_endpoint, + connection_id, + is_handshaking, + closed: None, + } + } + + /// Returns the certificates sent by the remote through the underlying TLS session. + /// Returns `None` if the connection is still handshaking. + // TODO: it seems to happen that is_handshaking is false but this returns None + pub(crate) fn peer_certificates( + &self, + ) -> Option> { + self.connection + .crypto_session() + .get_peer_certificates() + .map(|l| l.into_iter().map(|l| l.into())) + } + + /// Returns the address of the node we're connected to. + // TODO: can change /!\ + pub(crate) fn remote_addr(&self) -> SocketAddr { + self.connection.remote_address() + } + + /// Returns `true` if this connection is still pending. Returns `false` if we are connected to + /// the remote or if the connection is closed. + pub(crate) fn is_handshaking(&self) -> bool { + self.is_handshaking + } + + /// If the connection is closed, returns why. If the connection is open, returns `None`. + /// + /// > **Note**: This method is also the main way to determine whether a connection is closed. + pub(crate) fn close_reason(&self) -> Option<&Error> { + debug_assert!(!self.is_handshaking); + self.closed.as_ref() + } + + /// Start closing the connection. A [`ConnectionEvent::ConnectionLost`] event will be + /// produced in the future. + pub(crate) fn close(&mut self) { + // TODO: what if the user calls this multiple times? + // We send a dummy `0` error code with no message, as the API of StreamMuxer doesn't + // support this. + self.connection + .close(Instant::now(), From::from(0u32), Default::default()); + } + + /// Pops a new substream opened by the remote. + /// + /// If `None` is returned, then a [`ConnectionEvent::StreamAvailable`] event will later be + /// produced when a substream is available. + pub(crate) fn pop_incoming_substream(&mut self) -> Option { + self.connection.accept(quinn_proto::Dir::Bi) + } + + /// Pops a new substream opened locally. + /// + /// The API can be thought as if outgoing substreams were automatically opened by the local + /// QUIC connection and were added to a queue for availability. + /// + /// If `None` is returned, then a [`ConnectionEvent::StreamOpened`] event will later be + /// produced when a substream is available. + pub(crate) fn pop_outgoing_substream(&mut self) -> Option { + self.connection.open(quinn_proto::Dir::Bi) + } + + /// Reads data from the given substream. Similar to the API of `std::io::Read`. + /// + /// If `Err(ReadError::Blocked)` is returned, then a [`ConnectionEvent::StreamReadable`] event + /// will later be produced when the substream has readable data. A + /// [`ConnectionEvent::StreamStopped`] event can also be emitted. + pub(crate) fn read_substream( + &mut self, + id: quinn_proto::StreamId, + buf: &mut [u8], + ) -> Result { + self.connection.read(id, buf).map(|n| { + // `n` is `None` in case of EOF. + // See https://github.com/quinn-rs/quinn/blob/9aa3bde3aa1319b2c743f792312508de9270b8c6/quinn/src/streams.rs#L367-L370 + debug_assert_ne!(n, Some(0)); // Sanity check + n.unwrap_or(0) + }) + } + + /// Writes data to the given substream. Similar to the API of `std::io::Write`. + /// + /// If `Err(WriteError::Blocked)` is returned, then a [`ConnectionEvent::StreamWritable`] event + /// will later be produced when the substream can be written to. A + /// [`ConnectionEvent::StreamStopped`] event can also be emitted. + pub(crate) fn write_substream( + &mut self, + id: quinn_proto::StreamId, + buf: &[u8], + ) -> Result { + self.connection.write(id, buf) + } + + /// Closes the given substream. + /// + /// [`Connection::write_substream`] must no longer be called. The substream is however still + /// readable. + /// + /// On success, a [`ConnectionEvent::StreamFinished`] event will later be produced when the + /// substream has been effectively closed. A [`ConnectionEvent::StreamStopped`] event can also + /// be emitted. + pub(crate) fn shutdown_substream( + &mut self, + id: quinn_proto::StreamId, + ) -> Result<(), quinn_proto::FinishError> { + self.connection.finish(id) + } + + /// Polls the connection for an event that happend on it. + pub(crate) fn poll_event(&mut self, cx: &mut Context<'_>) -> Poll { + // Nothing more can be done if the connection is closed. + // Return `Pending` without registering the waker, essentially freezing the task forever. + if self.closed.is_some() { + return Poll::Pending; + } + + // Process events that the endpoint has sent to us. + loop { + match Pin::new(&mut self.from_endpoint).poll_next(cx) { + Poll::Ready(Some(event)) => self.connection.handle_event(event), + Poll::Ready(None) => { + debug_assert!(self.closed.is_none()); + let err = Error::ClosedChannel; + self.closed = Some(err.clone()); + return Poll::Ready(ConnectionEvent::ConnectionLost(err)); + } + Poll::Pending => break, + } + } + + 'send_pending: loop { + // Sending the pending event to the endpoint. If the endpoint is too busy, we just + // stop the processing here. + // There is a bit of a question in play here: should we continue to accept events + // through `from_endpoint` if `to_endpoint` is busy? + // We need to be careful to avoid a potential deadlock if both `from_endpoint` and + // `to_endpoint` are full. As such, we continue to transfer data from `from_endpoint` + // to the `quinn_proto::Connection` (see above). + // However we don't deliver substream-related events to the user as long as + // `to_endpoint` is full. This should propagate the back-pressure of `to_endpoint` + // being full to the user. + if let Some(pending_to_endpoint) = &mut self.pending_to_endpoint { + match Future::poll(Pin::new(pending_to_endpoint), cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(()) => self.pending_to_endpoint = None, + } + } + + let now = Instant::now(); + + // Poll the connection for packets to send on the UDP socket and try to send them on + // `to_endpoint`. + while let Some(transmit) = self.connection.poll_transmit(now) { + let endpoint = self.endpoint.clone(); + debug_assert!(self.pending_to_endpoint.is_none()); + self.pending_to_endpoint = Some(Box::pin(async move { + // TODO: ECN bits not handled + endpoint + .send_udp_packet(transmit.destination, transmit.contents) + .await; + })); + continue 'send_pending; + } + + // The connection also needs to be able to send control messages to the endpoint. This is + // handled here, and we try to send them on `to_endpoint` as well. + while let Some(endpoint_event) = self.connection.poll_endpoint_events() { + let endpoint = self.endpoint.clone(); + let connection_id = self.connection_id; + debug_assert!(self.pending_to_endpoint.is_none()); + self.pending_to_endpoint = Some(Box::pin(async move { + endpoint + .report_quinn_event(connection_id, endpoint_event) + .await; + })); + continue 'send_pending; + } + + // Timeout system. + // We break out of the following loop until if `poll_timeout()` returns `None` or if + // polling `self.next_timeout` returns `Poll::Pending`. + loop { + if let Some(next_timeout) = &mut self.next_timeout { + match Future::poll(Pin::new(next_timeout), cx) { + Poll::Ready(()) => { + self.connection.handle_timeout(now); + self.next_timeout = None; + } + Poll::Pending => break, + } + } else if let Some(when) = self.connection.poll_timeout() { + if when <= now { + self.connection.handle_timeout(now); + } else { + let delay = when - now; + self.next_timeout = Some(futures_timer::Delay::new(delay)); + } + } else { + break; + } + } + + // The final step consists in handling the events related to the various substreams. + while let Some(event) = self.connection.poll() { + match event { + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Opened { + dir: quinn_proto::Dir::Uni, + }) + | quinn_proto::Event::Stream(quinn_proto::StreamEvent::Available { + dir: quinn_proto::Dir::Uni, + }) + | quinn_proto::Event::DatagramReceived => { + // We don't use datagrams or unidirectional streams. If these events + // happen, it is by some code not compatible with libp2p-quic. + self.connection + .close(Instant::now(), From::from(0u32), Default::default()); + } + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Readable { id }) => { + return Poll::Ready(ConnectionEvent::StreamReadable(id)); + } + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Writable { id }) => { + return Poll::Ready(ConnectionEvent::StreamWritable(id)); + } + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Stopped { + id, .. + }) => { + // The `Stop` QUIC event is more or less similar to a `Reset`, except that + // it applies only on the writing side of the pipe. + return Poll::Ready(ConnectionEvent::StreamStopped(id)); + } + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Available { + dir: quinn_proto::Dir::Bi, + }) => { + return Poll::Ready(ConnectionEvent::StreamAvailable); + } + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Opened { + dir: quinn_proto::Dir::Bi, + }) => { + return Poll::Ready(ConnectionEvent::StreamOpened); + } + quinn_proto::Event::ConnectionLost { reason } => { + debug_assert!(self.closed.is_none()); + self.is_handshaking = false; + let err = Error::Quinn(reason); + self.closed = Some(err.clone()); + return Poll::Ready(ConnectionEvent::ConnectionLost(err)); + } + quinn_proto::Event::Stream(quinn_proto::StreamEvent::Finished { + id, + }) => { + return Poll::Ready(ConnectionEvent::StreamFinished(id)); + } + quinn_proto::Event::Connected => { + debug_assert!(self.is_handshaking); + debug_assert!(!self.connection.is_handshaking()); + self.is_handshaking = false; + return Poll::Ready(ConnectionEvent::Connected); + } + quinn_proto::Event::HandshakeDataReady => { + debug_assert!(self.is_handshaking); + debug_assert!(self.connection.is_handshaking()); + } + } + } + + break; + } + + Poll::Pending + } +} + +impl fmt::Debug for Connection { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("Connection").finish() + } +} + +impl Drop for Connection { + fn drop(&mut self) { + // TODO: don't do that if already drained + // We send a message to the endpoint. + self.endpoint.report_quinn_event_non_block(self.connection_id, quinn_proto::EndpointEvent::drained()); + } +} + +/// Event generated by the [`Connection`]. +#[derive(Debug)] +pub(crate) enum ConnectionEvent { + /// Now connected to the remote and certificates are available. + Connected, + + /// Connection has been closed and can no longer be used. + ConnectionLost(Error), + + /// Generated after [`Connection::pop_incoming_substream`] has been called and has returned + /// `None`. After this event has been generated, this method is guaranteed to return `Some`. + StreamAvailable, + /// Generated after [`Connection::pop_outgoing_substream`] has been called and has returned + /// `None`. After this event has been generated, this method is guaranteed to return `Some`. + StreamOpened, + + /// Generated after [`Connection::read_substream`] has been called and has returned a + /// `Blocked` error. + StreamReadable(quinn_proto::StreamId), + /// Generated after [`Connection::write_substream`] has been called and has returned a + /// `Blocked` error. + StreamWritable(quinn_proto::StreamId), + + /// Generated after [`Connection::shutdown_substream`] has been called. + StreamFinished(quinn_proto::StreamId), + /// A substream has been stopped. This concept is similar to the concept of a substream being + /// "reset", as in a TCP socket being reset for example. + StreamStopped(quinn_proto::StreamId), +} diff --git a/transports/quic/src/crypto.rs b/transports/quic/src/crypto.rs deleted file mode 100644 index de4e482373a..00000000000 --- a/transports/quic/src/crypto.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2021 David Craven. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use libp2p_core::identity::Keypair; -use quinn_proto::TransportConfig; -use std::sync::Arc; - -pub struct CryptoConfig { - pub keypair: Keypair, - pub keylogger: Option>, - pub transport: Arc, -} - -#[derive(Clone, Copy, Debug)] -pub(crate) struct TlsCrypto; - -impl TlsCrypto { - pub fn new_server_config(config: &CryptoConfig) -> Arc { - let mut server = crate::tls::make_server_config(&config.keypair).expect("invalid config"); - if let Some(key_log) = config.keylogger.clone() { - server.key_log = key_log; - } - Arc::new(server) - } - - pub fn new_client_config(config: &CryptoConfig) -> Arc { - let mut client = crate::tls::make_client_config(&config.keypair).expect("invalid config"); - if let Some(key_log) = config.keylogger.clone() { - client.key_log = key_log; - } - Arc::new(client) - } - - pub fn keylogger() -> Arc { - Arc::new(rustls::KeyLogFile::new()) - } -} diff --git a/transports/quic/src/endpoint.rs b/transports/quic/src/endpoint.rs index 50e01c5ae38..1ac98f1efee 100644 --- a/transports/quic/src/endpoint.rs +++ b/transports/quic/src/endpoint.rs @@ -1,4 +1,4 @@ -// Copyright 2021 David Craven. +// Copyright 2017-2020 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -18,439 +18,595 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::crypto::{CryptoConfig, TlsCrypto}; -use crate::muxer::QuicMuxer; -use crate::{QuicConfig, QuicError}; -use futures::channel::{mpsc, oneshot}; -use futures::prelude::*; -use quinn_proto::{ - ClientConfig as QuinnClientConfig, ConnectionEvent, ConnectionHandle, DatagramEvent, - EcnCodepoint, Endpoint as QuinnEndpoint, EndpointEvent, ServerConfig as QuinnServerConfig, - Transmit, +//! Background task dedicated to manage the QUIC state machine. +//! +//! Considering that all QUIC communications happen over a single UDP socket, one needs to +//! maintain a unique synchronization point that holds the state of all the active connections. +//! +//! The [`Endpoint`] object represents this synchronization point. It maintains a background task +//! whose role is to interface with the UDP socket. Communication between the background task and +//! the rest of the code only happens through channels. See the documentation of the +//! [`background_task`] for a thorough description. + +use crate::{connection::Connection, x509}; + +use async_std::net::SocketAddr; +use futures::{ + channel::{mpsc, oneshot}, + lock::Mutex, + prelude::*, +}; +use libp2p_core::multiaddr::Multiaddr; +use std::{ + collections::{HashMap, VecDeque}, + fmt, io, + sync::{Arc, Weak}, + task::Poll, + time::{Duration, Instant}, }; -use std::collections::{HashMap, VecDeque}; -use std::io::IoSliceMut; -use std::mem::MaybeUninit; -use std::net::SocketAddr; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::Instant; -use udp_socket::{RecvMeta, SocketType, UdpCapabilities, UdpSocket, BATCH_SIZE}; - -/// Message sent to the endpoint background task. -#[derive(Debug)] -enum ToEndpoint { - /// Instructs the endpoint to start connecting to the given address. - Dial { - /// UDP address to connect to. - addr: SocketAddr, - /// Channel to return the result of the dialing to. - tx: oneshot::Sender>, - }, - /// Sent by a `quinn_proto` connection when the endpoint needs to process an event generated - /// by a connection. The event itself is opaque to us. - ConnectionEvent { - connection_id: ConnectionHandle, - event: EndpointEvent, - }, - /// Instruct the endpoint to transmit a packet on its UDP socket. - Transmit(Transmit), -} - -#[derive(Debug)] -pub struct TransportChannel { - tx: mpsc::UnboundedSender, - rx: mpsc::Receiver>, - port: u16, - ty: SocketType, -} - -impl TransportChannel { - pub fn dial(&mut self, addr: SocketAddr) -> oneshot::Receiver> { - let (tx, rx) = oneshot::channel(); - let msg = ToEndpoint::Dial { addr, tx }; - self.tx.unbounded_send(msg).expect("endpoint has crashed"); - rx - } - - pub fn poll_incoming( - &mut self, - cx: &mut Context, - ) -> Poll>> { - Pin::new(&mut self.rx).poll_next(cx) - } - - pub fn port(&self) -> u16 { - self.port - } - - pub fn ty(&self) -> SocketType { - self.ty - } -} -#[derive(Debug)] -pub struct ConnectionChannel { - id: ConnectionHandle, - tx: mpsc::UnboundedSender, - rx: mpsc::Receiver, - port: u16, - max_datagrams: usize, +/// Represents the configuration for the [`Endpoint`]. +#[derive(Debug, Clone)] +pub struct Config { + /// The client configuration to pass to `quinn_proto`. + client_config: quinn_proto::ClientConfig, + /// The server configuration to pass to `quinn_proto`. + server_config: Arc, + /// The endpoint configuration to pass to `quinn_proto`. + endpoint_config: Arc, + /// The [`Multiaddr`] to use to spawn the UDP socket. + multiaddr: Multiaddr, } -impl ConnectionChannel { - pub fn poll_channel_events(&mut self, cx: &mut Context) -> Poll { - match Pin::new(&mut self.rx).poll_next(cx) { - Poll::Ready(Some(event)) => Poll::Ready(event), - Poll::Ready(None) => panic!("endpoint has crashed"), - Poll::Pending => Poll::Pending, - } - } - - pub fn send_endpoint_event(&mut self, event: EndpointEvent) { - let msg = ToEndpoint::ConnectionEvent { - connection_id: self.id, - event, - }; - self.tx.unbounded_send(msg).expect("endpoint has crashed") - } - - pub fn send_transmit(&mut self, transmit: Transmit) { - let msg = ToEndpoint::Transmit(transmit); - self.tx.unbounded_send(msg).expect("endpoint has crashed") - } - - pub fn port(&self) -> u16 { - self.port - } - - pub fn max_datagrams(&self) -> usize { - self.max_datagrams +impl Config { + /// Creates a new configuration object with default values. + pub fn new( + keypair: &libp2p_core::identity::Keypair, + multiaddr: Multiaddr, + ) -> Result { + let mut transport = quinn_proto::TransportConfig::default(); + transport.stream_window_uni(0).unwrap(); // Can only panic if value is out of range. + transport.datagram_receive_buffer_size(None); + transport.keep_alive_interval(Some(Duration::from_millis(10))); + let transport = Arc::new(transport); + let (client_tls_config, server_tls_config) = x509::make_tls_config(keypair)?; + let mut server_config = quinn_proto::ServerConfig::default(); + server_config.transport = transport.clone(); + server_config.crypto = Arc::new(server_tls_config); + let mut client_config = quinn_proto::ClientConfig::default(); + client_config.transport = transport; + client_config.crypto = Arc::new(client_tls_config); + Ok(Self { + client_config, + server_config: Arc::new(server_config), + endpoint_config: Default::default(), + multiaddr: multiaddr, + }) } } -#[derive(Debug)] -struct EndpointChannel { - rx: mpsc::UnboundedReceiver, - tx: mpsc::Sender>, - port: u16, - max_datagrams: usize, - connection_tx: mpsc::UnboundedSender, +/// Object containing all the QUIC resources shared between all connections. +// TODO: expand docs +// TODO: Debug trait +// TODO: remove useless fields +pub struct Endpoint { + /// Channel to the background of the endpoint. + /// See [`Endpoint::new_connections`] (just below) for a commentary about the mutex. + to_endpoint: Mutex>, + + /// Channel where new connections are being sent. + /// This is protected by a futures-friendly `Mutex`, meaning that receiving a connection is + /// done in two steps: locking this mutex, and grabbing the next element on the `Receiver`. + /// The only consequence of this `Mutex` is that multiple simultaneous calls to + /// [`Endpoint::next_incoming`] are serialized. + new_connections: Mutex>, + + /// Copy of [`Endpoint::to_endpoint`], except not behind a `Mutex`. Used if we want to be + /// guaranteed a slot in the messages buffer. + to_endpoint2: mpsc::Sender, + + /// Configuration passed at initialization. + // TODO: remove? + config: Config, + /// Multiaddr of the local UDP socket passed in the configuration at initialization after it + /// has potentially been modified to handle port number `0`. + local_multiaddr: Multiaddr, } -impl EndpointChannel { - pub fn poll_next_event(&mut self, cx: &mut Context) -> Poll> { - Pin::new(&mut self.rx).poll_next(cx) - } - - pub fn create_connection( - &self, - id: ConnectionHandle, - ) -> (ConnectionChannel, mpsc::Sender) { - let (tx, rx) = mpsc::channel(12); - let channel = ConnectionChannel { - id, - tx: self.connection_tx.clone(), - rx, - port: self.port, - max_datagrams: self.max_datagrams, +impl Endpoint { + /// Builds a new `Endpoint`. + pub fn new(config: Config) -> Result, io::Error> { + let local_socket_addr = match crate::transport::multiaddr_to_socketaddr(&config.multiaddr) { + Ok(a) => a, + Err(()) => panic!(), // TODO: Err(TransportError::MultiaddrNotSupported(multiaddr)), }; - (channel, tx) - } -} - -pub struct EndpointConfig { - socket: UdpSocket, - endpoint: QuinnEndpoint, - port: u16, - crypto_config: Arc, - capabilities: UdpCapabilities, -} - -impl EndpointConfig { - pub fn new(mut config: QuicConfig, addr: SocketAddr) -> Result { - config.transport.max_concurrent_uni_streams(0u32.into()); - config.transport.datagram_receive_buffer_size(None); - let transport = Arc::new(config.transport); - let crypto_config = Arc::new(CryptoConfig { - keypair: config.keypair, - keylogger: config.keylogger, - transport: transport.clone(), + // NOT blocking, as per man:bind(2), as we pass an IP address. + let socket = std::net::UdpSocket::bind(&local_socket_addr)?; + // TODO: + /*let port_is_zero = local_socket_addr.port() == 0; + let local_socket_addr = socket.local_addr()?; + if port_is_zero { + assert_ne!(local_socket_addr.port(), 0); + assert_eq!(multiaddr.pop(), Some(Protocol::Quic)); + assert_eq!(multiaddr.pop(), Some(Protocol::Udp(0))); + multiaddr.push(Protocol::Udp(local_socket_addr.port())); + multiaddr.push(Protocol::Quic); + }*/ + + let (to_endpoint_tx, to_endpoint_rx) = mpsc::channel(32); + let to_endpoint2 = to_endpoint_tx.clone(); + let (new_connections_tx, new_connections_rx) = mpsc::channel(1); + + let endpoint = Arc::new(Endpoint { + to_endpoint: Mutex::new(to_endpoint_tx), + to_endpoint2, + new_connections: Mutex::new(new_connections_rx), + config: config.clone(), + local_multiaddr: config.multiaddr.clone(), // TODO: no }); - let crypto = TlsCrypto::new_server_config(&crypto_config); - let mut server_config = QuinnServerConfig::with_crypto(crypto); - server_config.transport = transport; + // TODO: just for testing, do proper task spawning + async_std::task::spawn(background_task( + config.clone(), + Arc::downgrade(&endpoint), + async_std::net::UdpSocket::from(socket), + new_connections_tx, + to_endpoint_rx.fuse(), + )); + + Ok(endpoint) + + // TODO: IP address stuff + /*if socket_addr.ip().is_unspecified() { + info!("returning all local IPs for unspecified address"); + let suffixes = [Protocol::Udp(socket_addr.port()), Protocol::Quic]; + let local_addresses = + host_addresses(&suffixes).map_err(|e| TransportError::Other(Error::IO(e)))?; + for (_, _, address) in local_addresses { + info!("sending address {:?}", address); + new_connections + .unbounded_send(ListenerEvent::NewAddress(address)) + .expect("we have a reference to the peer, so this will not fail; qed") + } + } else { + info!("sending address {:?}", multiaddr); + new_connections + .unbounded_send(ListenerEvent::NewAddress(multiaddr.clone())) + .expect("we have a reference to the peer, so this will not fail; qed"); + } - let endpoint = QuinnEndpoint::new(Default::default(), Some(Arc::new(server_config))); + if socket_addr.ip().is_unspecified() { + debug!("returning all local IPs for unspecified address"); + let local_addresses = + host_addresses(&[Protocol::Udp(socket_addr.port()), Protocol::Quic]) + .map_err(|e| TransportError::Other(Error::IO(e)))?; + for i in local_addresses { + info!("sending address {:?}", i.2); + reference + .new_connections + .unbounded_send(ListenerEvent::NewAddress(i.2)) + .expect("we have a reference to the peer, so this will not fail; qed") + } + } else { + info!("sending address {:?}", multiaddr); + reference + .new_connections + .unbounded_send(ListenerEvent::NewAddress(multiaddr)) + .expect("we have a reference to the peer, so this will not fail; qed"); + } - let socket = UdpSocket::bind(addr)?; - let port = socket.local_addr()?.port(); - let capabilities = UdpSocket::capabilities()?; - Ok(Self { - socket, - endpoint, - port, - crypto_config, - capabilities, - }) + let endpoint = EndpointRef { reference, channel }; + let join_handle = spawn(endpoint.clone()); + Ok((Self(endpoint), join_handle))*/ } - pub fn spawn(self) -> TransportChannel { - let (tx1, rx1) = mpsc::unbounded(); - let (tx2, rx2) = mpsc::channel(1); - let transport = TransportChannel { - tx: tx1, - rx: rx2, - port: self.port, - ty: self.socket.socket_type(), - }; - let endpoint = EndpointChannel { - tx: tx2, - rx: rx1, - port: self.port, - max_datagrams: self.capabilities.max_gso_segments, - connection_tx: transport.tx.clone(), - }; - async_global_executor::spawn(Endpoint::new(endpoint, self)).detach(); - transport + /// Asks the endpoint to start dialing the given address. + /// + /// Note that this method only *starts* the dialing. `Ok` is returned as soon as possible, even + /// when the remote might end up being unreachable. + pub(crate) async fn dial( + &self, + addr: SocketAddr, + ) -> Result { + // The two `expect`s below can panic if the background task has stopped. The background + // task can stop only if the `Endpoint` is destroyed or if the task itself panics. In other + // words, we panic here iff a panic has already happened somewhere else, which is a + // reasonable thing to do. + let (tx, rx) = oneshot::channel(); + self.to_endpoint + .lock().await + .send(ToEndpoint::Dial { addr, result: tx }) + .await + .expect("background task has crashed"); + rx.await.expect("background task has crashed") } -} - -struct Endpoint { - channel: EndpointChannel, - endpoint: QuinnEndpoint, - socket: UdpSocket, - crypto_config: Arc, - connections: HashMap>, - outgoing: VecDeque, - recv_buf: Box<[u8]>, - incoming_slot: Option, - event_slot: Option<(ConnectionHandle, ConnectionEvent)>, -} -impl Endpoint { - pub fn new(channel: EndpointChannel, config: EndpointConfig) -> Self { - let max_udp_payload_size = config - .endpoint - .config() - .get_max_udp_payload_size() - .min(u16::MAX as _) as usize; - let recv_buf = vec![0; max_udp_payload_size * BATCH_SIZE].into_boxed_slice(); - Self { - channel, - endpoint: config.endpoint, - socket: config.socket, - crypto_config: config.crypto_config, - connections: Default::default(), - outgoing: Default::default(), - recv_buf, - incoming_slot: None, - event_slot: None, - } + /// Tries to pop a new incoming connection from the queue. + pub(crate) async fn next_incoming(&self) -> Connection { + // The `expect` below can panic if the background task has stopped. The background task + // can stop only if the `Endpoint` is destroyed or if the task itself panics. In other + // words, we panic here iff a panic has already happened somewhere else, which is a + // reasonable thing to do. + let mut new_connections = self.new_connections.lock().await; + new_connections + .next() + .await + .expect("background task has crashed") } - pub fn transmit(&mut self, transmit: Transmit) { - let ecn = transmit - .ecn - .map(|ecn| udp_socket::EcnCodepoint::from_bits(ecn as u8)) - .unwrap_or_default(); - let transmit = udp_socket::Transmit { - destination: transmit.destination, - contents: transmit.contents, - ecn, - segment_size: transmit.segment_size, - src_ip: transmit.src_ip, - }; - self.outgoing.push_back(transmit); + /// Asks the endpoint to send a UDP packet. + /// + /// Note that this method only queues the packet and returns as soon as the packet is in queue. + /// There is no guarantee that the packet will actually be sent, but considering that this is + /// a UDP packet, you cannot rely on the packet being delivered anyway. + pub(crate) async fn send_udp_packet( + &self, + destination: SocketAddr, + data: impl Into>, + ) { + let _ = self + .to_endpoint + .lock().await + .send(ToEndpoint::SendUdpPacket { + destination, + data: data.into(), + }) + .await; } - fn send_incoming(&mut self, muxer: QuicMuxer, cx: &mut Context) -> bool { - assert!(self.incoming_slot.is_none()); - match self.channel.tx.poll_ready(cx) { - Poll::Pending => { - self.incoming_slot = Some(muxer); - true - } - Poll::Ready(Ok(())) => { - self.channel.tx.try_send(Ok(muxer)).ok(); - false - } - Poll::Ready(_err) => false, - } + /// Report to the endpoint an event on a [`quinn_proto::Connection`]. + /// + /// This is typically called by a [`Connection`]. + /// + /// If `event.is_drained()` is true, the event indicates that the connection no longer exists. + /// This must therefore be the last event sent using this [`quinn_proto::ConnectionHandle`]. + pub(crate) async fn report_quinn_event( + &self, + connection_id: quinn_proto::ConnectionHandle, + event: quinn_proto::EndpointEvent, + ) { + self.to_endpoint + .lock().await + .send(ToEndpoint::ProcessConnectionEvent { + connection_id, + event, + }) + .await + .expect("background task has crashed"); } - fn send_event( - &mut self, - id: ConnectionHandle, - event: ConnectionEvent, - cx: &mut Context, - ) -> bool { - assert!(self.event_slot.is_none()); - let conn = self.connections.get_mut(&id).unwrap(); - match conn.poll_ready(cx) { - Poll::Pending => { - self.event_slot = Some((id, event)); - true - } - Poll::Ready(Ok(())) => { - conn.try_send(event).ok(); - false - } - Poll::Ready(_err) => false, - } + /// Similar to [`Endpoint::report_quinn_event`], except that the message sending is guaranteed + /// to be instantaneous and to succeed. + /// + /// This method bypasses back-pressure mechanisms and is meant to be called only from + /// destructors, where waiting is not advisable. + pub(crate) fn report_quinn_event_non_block( + &self, + connection_id: quinn_proto::ConnectionHandle, + event: quinn_proto::EndpointEvent, + ) { + // We implement this by cloning the `mpsc::Sender`. Since each sender is guaranteed a slot + // in the buffer, cloning the sender reserves the slot and sending thus always succeeds. + let result = self.to_endpoint2 + .clone() + .try_send(ToEndpoint::ProcessConnectionEvent { + connection_id, + event, + }); + assert!(result.is_ok()); } } -impl Future for Endpoint { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let me = Pin::into_inner(self); - - if let Some(muxer) = me.incoming_slot.take() { - if !me.send_incoming(muxer, cx) { - tracing::info!("cleared incoming slot"); - } - } +/// Message sent to the endpoint background task. +#[derive(Debug)] +enum ToEndpoint { + /// Instruct the endpoint to start connecting to the given address. + Dial { + /// UDP address to connect to. + addr: SocketAddr, + /// Channel to return the result of the dialing to. + result: oneshot::Sender>, + }, + /// Sent by a `quinn_proto` connection when the endpoint needs to process an event generated + /// by a connection. The event itself is opaque to us. Only `quinn_proto` knows what is in + /// there. + ProcessConnectionEvent { + connection_id: quinn_proto::ConnectionHandle, + event: quinn_proto::EndpointEvent, + }, + /// Instruct the endpoint to send a packet of data on its UDP socket. + SendUdpPacket { + /// Destination of the UDP packet. + destination: SocketAddr, + /// Packet of data to send. + data: Vec, + }, +} - if let Some((id, event)) = me.event_slot.take() { - if !me.send_event(id, event, cx) { - tracing::info!("cleared event slot"); +/// Task that runs in the background for as long as the endpont is alive. Responsible for +/// processing messages and the UDP socket. +/// +/// The `receiver` parameter must be the receiving side of the `Endpoint::to_endpoint` sender. +/// +/// # Behaviour +/// +/// This background task is responsible for the following: +/// +/// - Sending packets on the UDP socket. +/// - Receiving packets from the UDP socket and feed them to the [`quinn_proto::Endpoint`] state +/// machine. +/// - Transmitting events generated by the [`quinn_proto::Endpoint`] to the corresponding +/// [`Connection`]. +/// - Receiving messages from the `receiver` and processing the requested actions. This includes +/// UDP packets to send and events emitted by the [`Connection`] objects. +/// - Sending new connections on `new_connections`. +/// +/// When it comes to channels, there exists three main multi-producer-single-consumer channels +/// in play: +/// +/// - One channel, represented by `Endpoint::to_endpoint` and `receiver`, that communicates +/// messages from [`Endpoint`] to the background task and from the [`Connection`] to the +/// background task. +/// - One channel per each existing connection that communicates messages from the background +/// task to that [`Connection`]. +/// - One channel for the background task to send newly-opened connections to. The receiving +/// side is normally processed by a "listener" as defined by the [`libp2p_core::Transport`] +/// trait. +/// +/// In order to avoid an unbounded buffering of events, we prioritize sending data on the UDP +/// socket over everything else. If the network interface is too busy to process our packets, +/// everything comes to a freeze (including receiving UDP packets) until it is ready to accept +/// more. +/// +/// Apart from freezing when the network interface is too busy, the background task should sleep +/// as little as possible. It is in particular important for the `receiver` to be drained as +/// quickly as possible in order to avoid unnecessary back-pressure on the [`Connection`] objects. +/// +/// ## Back-pressure on `new_connections` +/// +/// The [`quinn_proto::Endpoint`] object contains an accept buffer, in other words a buffer of the +/// incoming connections waiting to be accepted. When a new connection is signalled, we send this +/// new connection on the `new_connections` channel in an asynchronous way, and we only free a slot +/// in the accept buffer once the element has actually been enqueued on `new_connections`. There +/// are therefore in total three buffers in play: the `new_connections` channel itself, the queue +/// of elements being sent on `new_connections`, and the accept buffer of the +/// [`quinn_proto::Endpoint`]. +/// +/// Unfortunately, this design has the consequence that, on the network layer, we will accept a +/// certain number of incoming connections even if [`Endpoint::next_incoming`] is never even +/// called. The `quinn-proto` library doesn't provide any way to not accept incoming connections +/// apart from filling the accept buffer. +/// +/// ## Back-pressure on connections +/// +/// Because connections are processed by the user at a rate of their choice, we cannot properly +/// handle the situation where the channel from the background task to individual connections is +/// full. Sleeping the task while waiting for the connection to be processed by the user could +/// even lead to a deadlock if this processing is also sleeping waiting for some other action that +/// itself depends on the background task (e.g. if processing the connection is waiting for a +/// message arriving on a different connection). +/// +/// In an ideal world, we would handle a background-task-to-connection channel being full by +/// dropping UDP packets destined to this connection, as a way to back-pressure the remote. +/// Unfortunately, the `quinn-proto` library doesn't provide any way for us to know which +/// connection a UDP packet is destined for before it has been turned into a [`ConnectionEvent`], +/// and because these [`ConnectionEvent`]s are sometimes used to synchronize the states of the +/// endpoint and connection, it would be a logic error to silently drop them. +/// +/// We handle this tricky situation by simply killing connections as soon as their associated +/// channel is full. +/// +// TODO: actually implement the killing of connections if channel is full, at the moment we just +// wait +/// # Shutdown +/// +/// The background task shuts down if `endpoint_weak`, `receiver` or `new_connections` become +/// disconnected/invalid. This corresponds to the lifetime of the associated [`Endpoint`]. +/// +/// Keep in mind that we pass an `Arc` whenever we create a new connection, which +/// guarantees that the [`Endpoint`], and therefore the background task, is properly kept alive +/// for as long as any QUIC connection is open. +/// +async fn background_task( + config: Config, + endpoint_weak: Weak, + udp_socket: async_std::net::UdpSocket, + mut new_connections: mpsc::Sender, + mut receiver: stream::Fuse>, +) { + // The actual QUIC state machine. + let mut endpoint = quinn_proto::Endpoint::new( + config.endpoint_config.clone(), + Some(config.server_config.clone()), + ); + + // List of all active connections, with a sender to notify them of events. + let mut alive_connections = HashMap::>::new(); + + // Buffer where we write packets received from the UDP socket. + let mut socket_recv_buffer = vec![0; 65536]; + + // The quinn_proto endpoint can give us new connections for as long as its accept buffer + // isn't full. This buffer is used to push these new connections while we are waiting to + // send them on the `new_connections` channel. We only call `endpoint.accept()` when we remove + // an element from this list, which guarantees that it doesn't grow unbounded. + // TODO: with_capacity? + let mut queued_new_connections = VecDeque::new(); + + // Next packet waiting to be transmitted on the UDP socket, if any. + // Note that this variable isn't strictly necessary, but it reduces code duplication in the + // code below. + let mut next_packet_out: Option<(SocketAddr, Vec)> = None; + + // Main loop of the task. + loop { + // Start by flushing `next_packet_out`. + if let Some((destination, data)) = next_packet_out.take() { + // We block the current task until the packet is sent. This way, if the + // network interface is too busy, we back-pressure all of our internal + // channels. + // TODO: set ECN bits; there is no support for them in the ecosystem right now + match udp_socket.send_to(&data, destination).await { + Ok(n) if n == data.len() => {} + Ok(_) => log::error!( + "QUIC UDP socket violated expectation that packets are always fully \ + transferred" + ), + + // Errors on the socket are expected to never happen, and we handle them by simply + // printing a log message. The packet gets discarded in case of error, but we are + // robust to packet losses and it is consequently not a logic error to process with + // normal operations. + Err(err) => log::error!("Error while sending on QUIC UDP socket: {:?}", err), } } - while let Some(transmit) = me.endpoint.poll_transmit() { - me.transmit(transmit); + // The endpoint might request packets to be sent out. This is handled in priority to avoid + // buffering up packets. + if let Some(packet) = endpoint.poll_transmit() { + debug_assert!(next_packet_out.is_none()); + next_packet_out = Some((packet.destination, packet.contents)); + continue; } - if me.event_slot.is_none() { - while let Poll::Ready(event) = me.channel.poll_next_event(cx) { - match event { - Some(ToEndpoint::Dial { addr, tx }) => { - let crypto = TlsCrypto::new_client_config(&me.crypto_config); - let mut client_config = QuinnClientConfig::new(crypto); - client_config.transport = me.crypto_config.transport.clone(); - - let (id, connection) = - match me.endpoint.connect(client_config, addr, "server_name") { + futures::select! { + message = receiver.next() => { + // Received a message from a different part of the code requesting us to + // do something. + match message { + // Shut down if the endpoint has shut down. + None => return, + + Some(ToEndpoint::Dial { addr, result }) => { + // This `"l"` seems necessary because an empty string is an invalid domain + // name. While we don't use domain names, the underlying rustls library + // is based upon the assumption that we do. + let (connection_id, connection) = + match endpoint.connect(config.client_config.clone(), addr, "l") { Ok(c) => c, Err(err) => { - tracing::error!("dial failure: {}", err); - let _ = tx.send(Err(err.into())); + let _ = result.send(Err(err)); continue; } }; - let (channel, conn) = me.channel.create_connection(id); - me.connections.insert(id, conn); - let muxer = QuicMuxer::new(channel, connection); - tx.send(Ok(muxer)).ok(); + + let endpoint_arc = match endpoint_weak.upgrade() { + Some(ep) => ep, + None => return, // Shut down the task if the endpoint is dead. + }; + + debug_assert_eq!(connection.side(), quinn_proto::Side::Client); + let (tx, rx) = mpsc::channel(16); + let connection = Connection::from_quinn_connection(endpoint_arc, connection, connection_id, rx); + alive_connections.insert(connection_id, tx); + let _ = result.send(Ok(connection)); } - Some(ToEndpoint::ConnectionEvent { - connection_id, - event, - }) => { + + // A connection wants to notify the endpoint of something. + Some(ToEndpoint::ProcessConnectionEvent { connection_id, event }) => { + debug_assert!(alive_connections.contains_key(&connection_id)); + // We "drained" event indicates that the connection no longer exists and + // its ID can be reclaimed. let is_drained_event = event.is_drained(); if is_drained_event { - me.connections.remove(&connection_id); + alive_connections.remove(&connection_id); } - if let Some(event) = me.endpoint.handle_event(connection_id, event) { - if me.send_event(connection_id, event, cx) { - tracing::info!("filled event slot"); - break; - } + if let Some(event_back) = endpoint.handle_event(connection_id, event) { + debug_assert!(!is_drained_event); + // TODO: don't await here /!\ + alive_connections.get_mut(&connection_id).unwrap().send(event_back).await; } } - Some(ToEndpoint::Transmit(transmit)) => { - me.transmit(transmit); - } - None => { - me.endpoint.reject_new_connections(); - return Poll::Ready(()); + + // Data needs to be sent on the UDP socket. + Some(ToEndpoint::SendUdpPacket { destination, data }) => { + debug_assert!(next_packet_out.is_none()); + next_packet_out = Some((destination, data)); + continue; } } } - } - while !me.outgoing.is_empty() { - let transmits: &[_] = me.outgoing.make_contiguous(); - match me.socket.poll_send(cx, transmits) { - Poll::Ready(Ok(n)) => { - me.outgoing.drain(..n); + // The future we create here wakes up if two conditions are fulfilled: + // + // - The `new_connections` channel is ready to accept a new element. + // - `queued_new_connections` is not empty. + // + // When this happens, we pop an element from `queued_new_connections`, put it on the + // channel, and call `endpoint.accept()`, thereby allowing the QUIC state machine to + // feed a new incoming connection to us. + readiness = { + let active = !queued_new_connections.is_empty(); + let new_connections = &mut new_connections; + future::poll_fn(move |cx| { + if active { new_connections.poll_ready(cx) } else { Poll::Pending } + }).fuse() + } => { + if readiness.is_err() { + // new_connections channel has been dropped, meaning that the endpoint has + // been destroyed. + return; } - Poll::Ready(Err(err)) => tracing::error!("send_to: {}", err), - Poll::Pending => break, - } - } - if me.event_slot.is_none() && me.incoming_slot.is_none() { - let mut metas = [RecvMeta::default(); BATCH_SIZE]; - let mut iovs = MaybeUninit::<[IoSliceMut; BATCH_SIZE]>::uninit(); - fn init_iovs<'a>( - iovs: &'a mut MaybeUninit<[IoSliceMut<'a>; BATCH_SIZE]>, - recv_buf: &'a mut [u8], - ) -> &'a mut [IoSliceMut<'a>] { - let chunk_size = recv_buf.len() / BATCH_SIZE; - let chunks = recv_buf.chunks_mut(chunk_size); - // every iovs elem must be initialized with an according elem from buf chunks - assert_eq!(chunks.len(), BATCH_SIZE); - chunks.enumerate().for_each(|(i, buf)| unsafe { - iovs.as_mut_ptr() - .cast::() - .add(i) - .write(IoSliceMut::new(buf)); - }); - - unsafe { - // SAFETY: all elements are initialized - iovs.assume_init_mut() - } + let elem = queued_new_connections.pop_front() + .expect("if queue is empty, the future above is always Pending; qed"); + new_connections.start_send(elem) + .expect("future is waken up only if poll_ready returned Ready; qed"); + endpoint.accept(); } - let mut recv_buf = core::mem::replace(&mut me.recv_buf, Vec::new().into_boxed_slice()); - let iovs = init_iovs(&mut iovs, &mut recv_buf); - while let Poll::Ready(result) = me.socket.poll_recv(cx, iovs, &mut metas) { - let n = match result { - Ok(n) => n, + + result = udp_socket.recv_from(&mut socket_recv_buffer).fuse() => { + let (packet_len, packet_src) = match result { + Ok(v) => v, + // Errors on the socket are expected to never happen, and we handle them by + // simply printing a log message. Err(err) => { - tracing::error!("recv_from: {}", err); + log::error!("Error while receive on QUIC UDP socket: {:?}", err); continue; - } + }, }; - for i in 0..n { - let meta = &metas[i]; - let packet = From::from(&iovs[i][..meta.len]); - let ecn = meta - .ecn - .map(|ecn| EcnCodepoint::from_bits(ecn as u8)) - .unwrap_or_default(); - match me - .endpoint - .handle(Instant::now(), meta.source, meta.dst_ip, ecn, packet) - { - None => {} - Some((id, DatagramEvent::ConnectionEvent(event))) => { - if me.send_event(id, event, cx) { - tracing::info!("filled event slot"); - break; - } - } - Some((id, DatagramEvent::NewConnection(connection))) => { - let (channel, tx) = me.channel.create_connection(id); - me.connections.insert(id, tx); - let muxer = QuicMuxer::new(channel, connection); - if me.send_incoming(muxer, cx) { - tracing::info!("filled incoming slot"); - break; - } + + // Received a UDP packet from the socket. + debug_assert!(packet_len <= socket_recv_buffer.len()); + let packet = From::from(&socket_recv_buffer[..packet_len]); + // TODO: ECN bits aren't handled + match endpoint.handle(Instant::now(), packet_src, None, packet) { + None => {}, + Some((connec_id, quinn_proto::DatagramEvent::ConnectionEvent(event))) => { + // Event to send to an existing connection. + if let Some(sender) = alive_connections.get_mut(&connec_id) { + let _ = sender.send(event).await; // TODO: don't await here /!\ + } else { + log::error!("State mismatch: event for closed connection"); } - } + }, + Some((connec_id, quinn_proto::DatagramEvent::NewConnection(connec))) => { + // A new connection has been received. `connec_id` is a newly-allocated + // identifier. + debug_assert_eq!(connec.side(), quinn_proto::Side::Server); + let (tx, rx) = mpsc::channel(16); + alive_connections.insert(connec_id, tx); + let endpoint_arc = match endpoint_weak.upgrade() { + Some(ep) => ep, + None => return, // Shut down the task if the endpoint is dead. + }; + let connection = Connection::from_quinn_connection(endpoint_arc, connec, connec_id, rx); + + // As explained in the documentation, we put this new connection in an + // intermediary buffer. At the next loop iteration we will try to move it + // to the `new_connections` channel. We call `endpoint.accept()` only once + // the element has successfully been sent on `new_connections`. + queued_new_connections.push_back(connection); + }, } } - me.recv_buf = recv_buf; } + } +} - Poll::Pending +impl fmt::Debug for Endpoint { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("Endpoint").finish() } } diff --git a/transports/quic/src/error.rs b/transports/quic/src/error.rs new file mode 100644 index 00000000000..9671950d4e3 --- /dev/null +++ b/transports/quic/src/error.rs @@ -0,0 +1,90 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::channel::mpsc::SendError; +use io::ErrorKind; +use std::io; +use thiserror::Error; + +/// An error that can be returned by libp2p-quic. +#[derive(Error, Debug)] +pub enum Error { + /// Fatal I/O error + #[error("Fatal I/O error {0}")] + IO(#[from] std::io::Error), + /// QUIC protocol error + #[error("QUIC protocol error: {0}")] + ConnectionError(#[from] quinn_proto::ConnectionError), + /// Peer stopped receiving data + #[error("Peer stopped receiving data: code {0}")] + Stopped(quinn_proto::VarInt), + /// Connection was prematurely closed + #[error("Connection was prematurely closed")] + ConnectionLost, + /// Error making the connection. + #[error("Connection failure: {0}")] + ConnectError(#[from] quinn_proto::ConnectError), + /// Cannot listen on the same endpoint more than once + #[error("Cannot listen on the same endpoint more than once")] + AlreadyListening, + /// The stream was reset by the peer. + #[error("Peer reset stream: code {0}")] + Reset(quinn_proto::VarInt), + /// Either an attempt was made to write to a stream that was already shut down, + /// or a previous operation on this stream failed. + #[error( + "Use of a stream that has is no longer valid. This is a \ + bug in the application." + )] + ExpiredStream, + /// Reading from a stream that has not been written to. + #[error("Reading from a stream that has not been written to.")] + CannotReadFromUnwrittenStream, + /// Fatal internal error or network failure + #[error("Fatal internal error or network failure")] + NetworkFailure, + /// Connection already being closed + #[error("Connection already being closed")] + ConnectionClosing, +} + +impl From for Error { + fn from(_: SendError) -> Error { + Error::NetworkFailure + } +} + +impl From for io::Error { + fn from(e: Error) -> Self { + match e { + Error::IO(e) => io::Error::new(e.kind(), Error::IO(e)), + Error::ConnectionError(e) => e.into(), + e @ Error::NetworkFailure + | e @ Error::ConnectionClosing + | e @ Error::ConnectError(_) => io::Error::new(ErrorKind::Other, e), + e @ Error::Stopped(_) | e @ Error::Reset(_) | e @ Error::ConnectionLost => { + io::Error::new(ErrorKind::ConnectionAborted, e) + } + e @ Error::ExpiredStream => io::Error::new(ErrorKind::BrokenPipe, e), + e @ Error::AlreadyListening => io::Error::new(ErrorKind::AddrInUse, e), + e @ Error::CannotReadFromUnwrittenStream => io::Error::new(ErrorKind::NotConnected, e), + } + } +} diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs index d609f2eaaa6..97dc27ef996 100644 --- a/transports/quic/src/lib.rs +++ b/transports/quic/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2021 David Craven. +// Copyright 2020 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -18,74 +18,52 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -mod crypto; -mod endpoint; -mod muxer; -mod tls; -mod transport; - -use crate::crypto::TlsCrypto; -pub use crate::muxer::{QuicMuxer, QuicMuxerError}; -pub use crate::transport::{QuicDial, QuicTransport}; -pub use quinn_proto::{ConfigError, ConnectError, ConnectionError, TransportConfig}; - -use libp2p_core::identity::Keypair; -use libp2p_core::transport::TransportError; -use libp2p_core::Multiaddr; -use std::sync::Arc; -use thiserror::Error; +#![recursion_limit = "1024"] -/// Quic configuration. -pub struct QuicConfig { - pub keypair: Keypair, - pub transport: TransportConfig, - pub keylogger: Option>, -} +//! Implementation of the libp2p `Transport` and `StreamMuxer` traits for QUIC. +//! +//! # Usage +//! +//! Example: +//! +//! ``` +//! use libp2p_quic::{Config, Endpoint}; +//! use libp2p_core::Multiaddr; +//! +//! let keypair = libp2p_core::identity::Keypair::generate_ed25519(); +//! let addr = "/ip4/127.0.0.1/udp/12345/quic".parse().expect("bad address?"); +//! let quic_config = Config::new(&keypair, addr).expect("could not make config"); +//! let quic_endpoint = Endpoint::new(quic_config).expect("I/O error"); +//! ``` +//! +//! The `Endpoint` struct implements the `Transport` trait of the `core` library. See the +//! documentation of `core` and of libp2p in general to learn how to use the `Transport` trait. +//! +//! Note that QUIC provides transport, security, and multiplexing in a single protocol. Therefore, +//! QUIC connections do not need to be upgraded. You will get a compile-time error if you try. +//! Instead, you must pass all needed configuration into the constructor. +//! +//! # Design Notes +//! +//! The entry point is the `Endpoint` struct. It represents a single QUIC endpoint. You +//! should generally have one of these per process. +//! +//! `Endpoint` manages a background task that processes all incoming packets. Each +//! `QuicConnection` also manages a background task, which handles socket output and timer polling. -impl std::fmt::Debug for QuicConfig { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("QuicConfig") - .field("keypair", &self.keypair.public()) - .field("transport", &self.transport) - .finish() - } -} +#![deny(unsafe_code)] -impl QuicConfig { - /// Creates a new config from a keypair. - pub fn new(keypair: Keypair) -> Self { - Self { - keypair, - transport: TransportConfig::default(), - keylogger: None, - } - } - - /// Enable keylogging. - pub fn enable_keylogger(&mut self) -> &mut Self { - self.keylogger = Some(TlsCrypto::keylogger()); - self - } +mod connection; +mod endpoint; +mod error; +mod muxer; +mod upgrade; +mod x509; - /// Spawns a new endpoint. - pub async fn listen_on( - self, - addr: Multiaddr, - ) -> Result> { - QuicTransport::new(self, addr).await - } -} +pub mod transport; -#[derive(Debug, Error)] -pub enum QuicError { - #[error("{0}")] - Config(#[from] ConfigError), - #[error("{0}")] - Connect(#[from] ConnectError), - #[error("{0}")] - Muxer(#[from] QuicMuxerError), - #[error("{0}")] - Io(#[from] std::io::Error), - #[error("a `StreamMuxerEvent` was generated before the handshake was complete.")] - UpgradeError, -} +pub use endpoint::{Config, Endpoint}; +pub use error::Error; +pub use muxer::QuicMuxer; +pub use transport::QuicTransport; +pub use upgrade::Upgrade; diff --git a/transports/quic/src/muxer.rs b/transports/quic/src/muxer.rs index 45cd5f06c8c..f3477b9562b 100644 --- a/transports/quic/src/muxer.rs +++ b/transports/quic/src/muxer.rs @@ -1,4 +1,4 @@ -// Copyright 2021 David Craven. +// Copyright 2020 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -18,422 +18,326 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::endpoint::ConnectionChannel; -use async_io::Timer; -use futures::prelude::*; +use crate::connection::{Connection, ConnectionEvent}; +use crate::error::Error; + use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; -use libp2p_core::{Multiaddr, PeerId}; use parking_lot::Mutex; -use quinn_proto::Connection as QuinnConnection; -use quinn_proto::{ - ConnectionError, Dir, Event, FinishError, ReadError, ReadableError, StreamEvent, StreamId, - VarInt, WriteError, +use std::{ + collections::HashMap, + fmt, + task::{Context, Poll, Waker}, }; -use std::collections::{HashMap, VecDeque}; -use std::io::Write; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::pin::Pin; -use std::task::{Context, Poll, Waker}; -use std::time::Instant; -use thiserror::Error; /// State for a single opened QUIC connection. pub struct QuicMuxer { + // Note: This could theoretically be an asynchronous future, in order to yield the current + // task if a task running in parallel is already holding the lock. However, using asynchronous + // mutexes without async/await is extremely tedious and maybe not worth the effort. inner: Mutex, } -impl std::fmt::Debug for QuicMuxer { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "QuicMuxer") - } -} - -/// Mutex protected fields of [`QuicMuxer`]. +/// Mutex-protected fields of [`QuicMuxer`]. struct QuicMuxerInner { - /// Accept incoming streams. - accept_incoming: bool, - /// Endpoint channel. - endpoint: ConnectionChannel, /// Inner connection object that yields events. - connection: QuinnConnection, - /// Connection waker. - waker: Option, - /// Connection timer. - timer: Option, - /// State of all open substreams. - substreams: HashMap, - /// Pending substreams. - pending_substreams: VecDeque, - /// Close waker. - close_waker: Option, + connection: Connection, + /// State of all the substreams that the muxer reports as open. + substreams: HashMap, + /// Waker to wake if a new outgoing substream is opened. + poll_substream_opened_waker: Option, + /// Waker to wake if the connection is closed. + poll_close_waker: Option, } /// State of a single substream. -#[derive(Debug, Default)] +#[derive(Default)] struct SubstreamState { - /// Waker to wake if the substream becomes readable. + /// Waker to wake if the substream becomes readable or stopped. read_waker: Option, - /// Waker to wake if the substream becomes writable. + /// Waker to wake if the substream becomes writable or stopped. write_waker: Option, + /// True if the substream has been closed. + finished: bool, + /// Waker to wake if the substream becomes closed or stopped. + finished_waker: Option, } impl QuicMuxer { - pub fn new(endpoint: ConnectionChannel, connection: QuinnConnection) -> Self { - Self { + /// Crate-internal function that builds a [`QuicMuxer`] from a raw connection. + /// + /// # Panic + /// + /// Panics if `connection.is_handshaking()` returns `true`. + pub(crate) fn from_connection(connection: Connection) -> Self { + assert!(!connection.is_handshaking()); + + QuicMuxer { inner: Mutex::new(QuicMuxerInner { - accept_incoming: false, - endpoint, connection, - waker: None, - timer: None, substreams: Default::default(), - pending_substreams: Default::default(), - close_waker: None, + poll_substream_opened_waker: None, + poll_close_waker: None, }), } } - - pub fn is_handshaking(&self) -> bool { - self.inner.lock().connection.is_handshaking() - } - - pub fn peer_id(&self) -> Option { - let inner = self.inner.lock(); - let session = inner.connection.crypto_session(); - let identity = session.peer_identity()?; - let certs: Box> = identity.downcast().ok()?; - let cert = certs.get(0)?; - let p2p_cert = crate::tls::certificate::parse_certificate(&cert.0).ok()?; - Some(PeerId::from_public_key(&p2p_cert.extension.public_key)) - } - - pub fn local_addr(&self) -> Multiaddr { - let inner = self.inner.lock(); - let ip = inner - .connection - .local_ip() - .unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)); - let addr = SocketAddr::new(ip, inner.endpoint.port()); - crate::transport::socketaddr_to_multiaddr(&addr) - } - - pub fn remote_addr(&self) -> Multiaddr { - let inner = self.inner.lock(); - let addr = inner.connection.remote_address(); - crate::transport::socketaddr_to_multiaddr(&addr) - } - - pub(crate) fn set_accept_incoming(&self, accept: bool) { - let mut inner = self.inner.lock(); - inner.accept_incoming = accept; - } } impl StreamMuxer for QuicMuxer { - type Substream = StreamId; type OutboundSubstream = (); - type Error = QuicMuxerError; + type Substream = quinn_proto::StreamId; + type Error = Error; - fn poll_event( - &self, - cx: &mut Context, - ) -> Poll, Self::Error>> { + // TODO: what if called multiple times? register all wakers? + fn poll_event(&self, cx: &mut Context<'_>) -> Poll, Self::Error>> { + // We use `poll_inbound` to perform the background processing of the entire connection. let mut inner = self.inner.lock(); - let now = Instant::now(); - - while let Poll::Ready(event) = inner.endpoint.poll_channel_events(cx) { - inner.connection.handle_event(event); - } - - let max_datagrams = inner.endpoint.max_datagrams(); - while let Some(transmit) = inner.connection.poll_transmit(now, max_datagrams) { - inner.endpoint.send_transmit(transmit); - } - - loop { - if let Some(timer) = inner.timer.as_mut() { - match Pin::new(timer).poll(cx) { - Poll::Ready(expired) => { - inner.connection.handle_timeout(expired); - inner.timer = None; - } - Poll::Pending => break, - } - } else if let Some(when) = inner.connection.poll_timeout() { - inner.timer = Some(Timer::at(when)); - } else { - break; - } - } - - while let Some(event) = inner.connection.poll_endpoint_events() { - inner.endpoint.send_endpoint_event(event); - } - while let Some(event) = inner.connection.poll() { + while let Poll::Ready(event) = inner.connection.poll_event(cx) { match event { - Event::HandshakeDataReady => {} - Event::Connected => { - // Break here so that the noise upgrade can finish. - return Poll::Pending; + ConnectionEvent::Connected => { + log::error!("Unexpected Connected event on established QUIC connection"); } - Event::ConnectionLost { reason } => { - tracing::debug!("connection lost because of {}", reason); - inner.substreams.clear(); - if let Some(waker) = inner.close_waker.take() { + ConnectionEvent::ConnectionLost(_) => { + if let Some(waker) = inner.poll_close_waker.take() { waker.wake(); } - return Poll::Ready(Err(QuicMuxerError::ConnectionLost { reason })); } - Event::Stream(StreamEvent::Opened { dir: Dir::Bi }) => { - // handled at end. - } - Event::Stream(StreamEvent::Readable { id }) => { - tracing::trace!("stream readable {}", id); - let substream = inner.substreams.get_mut(&id).expect("known substream; qed"); - if let Some(waker) = substream.read_waker.take() { - waker.wake(); - } - } - Event::Stream(StreamEvent::Writable { id }) => { - tracing::trace!("stream writable {}", id); - let substream = inner.substreams.get_mut(&id).expect("known substream; qed"); - if let Some(waker) = substream.write_waker.take() { + + ConnectionEvent::StreamOpened => { + if let Some(waker) = inner.poll_substream_opened_waker.take() { waker.wake(); } } - Event::Stream(StreamEvent::Finished { id }) => { - tracing::trace!("stream finished {}", id); - if let Some(substream) = inner.substreams.get_mut(&id) { + ConnectionEvent::StreamReadable(substream) => { + if let Some(substream) = inner.substreams.get_mut(&substream) { if let Some(waker) = substream.read_waker.take() { waker.wake(); } + } + } + ConnectionEvent::StreamWritable(substream) => { + if let Some(substream) = inner.substreams.get_mut(&substream) { if let Some(waker) = substream.write_waker.take() { waker.wake(); } } } - Event::Stream(StreamEvent::Stopped { id, error_code }) => { - tracing::debug!("substream {} stopped with error {}", id, error_code); - inner.substreams.remove(&id); - return Poll::Ready(Err(QuicMuxerError::StreamStopped { id, error_code })); - } - Event::Stream(StreamEvent::Available { dir: Dir::Bi }) => { - tracing::trace!("substream available"); - if let Some(waker) = inner.pending_substreams.pop_front() { - waker.wake(); + ConnectionEvent::StreamFinished(substream) | + ConnectionEvent::StreamStopped(substream) => { + if let Some(substream) = inner.substreams.get_mut(&substream) { + if let ConnectionEvent::StreamFinished(_) = event { + substream.finished = true; + } + if let Some(waker) = substream.read_waker.take() { + waker.wake(); + } + if let Some(waker) = substream.write_waker.take() { + waker.wake(); + } + if let Some(waker) = substream.finished_waker.take() { + waker.wake(); + } } } - Event::Stream(StreamEvent::Opened { dir: Dir::Uni }) - | Event::Stream(StreamEvent::Available { dir: Dir::Uni }) - | Event::DatagramReceived => { - // We don't use datagrams or unidirectional streams. If these events - // happen, it is by some code not compatible with libp2p-quic. - inner - .connection - .close(Instant::now(), From::from(0u32), Default::default()); - return Poll::Ready(Err(QuicMuxerError::ProtocolViolation)); + ConnectionEvent::StreamAvailable => { + // Handled below. } } } - // TODO quinn doesn't support `StreamMuxerEvent::AddressChange`. - - if inner.accept_incoming { - if let Some(id) = inner.connection.streams().accept(Dir::Bi) { - inner.substreams.insert(id, Default::default()); - tracing::trace!("opened incoming substream {}", id); - return Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(id))); - } - } - - if inner.substreams.is_empty() { - if let Some(waker) = inner.close_waker.take() { - waker.wake(); - } + if let Some(substream) = inner.connection.pop_incoming_substream() { + inner.substreams.insert(substream, Default::default()); + Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) + } else { + Poll::Pending } - - inner.waker = Some(cx.waker().clone()); - Poll::Pending } - fn open_outbound(&self) -> Self::OutboundSubstream {} + fn open_outbound(&self) -> Self::OutboundSubstream { + () + } + // TODO: what if called multiple times? register all wakers? fn poll_outbound( &self, - cx: &mut Context, + cx: &mut Context<'_>, _: &mut Self::OutboundSubstream, ) -> Poll> { + // Note: this implementation makes it possible to poll the same `Self::OutboundSubstream` + // over and over again and get new substreams. Using the API this way is invalid and would + // normally result in a panic, but we decide to just ignore this problem. + let mut inner = self.inner.lock(); - if let Some(id) = inner.connection.streams().open(Dir::Bi) { - tracing::trace!("opened outgoing substream {}", id); - inner.substreams.insert(id, Default::default()); - if let Some(waker) = inner.pending_substreams.pop_front() { - waker.wake(); - } - Poll::Ready(Ok(id)) - } else { - inner.pending_substreams.push_back(cx.waker().clone()); - Poll::Pending + if let Some(substream) = inner.connection.pop_outgoing_substream() { + inner.substreams.insert(substream, Default::default()); + return Poll::Ready(Ok(substream)); + } + + // Register `cx.waker()` as having to be woken up once a substream is available. + if !inner + .poll_substream_opened_waker + .as_ref() + .map_or(false, |w| w.will_wake(cx.waker())) + { + inner.poll_substream_opened_waker = Some(cx.waker().clone()); } + + Poll::Pending } fn destroy_outbound(&self, _: Self::OutboundSubstream) {} - fn read_substream( + fn write_substream( &self, - cx: &mut Context, - id: &mut Self::Substream, - mut buf: &mut [u8], + cx: &mut Context<'_>, + substream: &mut Self::Substream, + buf: &[u8], ) -> Poll> { let mut inner = self.inner.lock(); - let mut stream = inner.connection.recv_stream(*id); - let mut chunks = match stream.read(true) { - Ok(chunks) => chunks, - Err(ReadableError::UnknownStream) => { - return Poll::Ready(Err(QuicMuxerError::UnknownStream { id: *id })) - } - Err(ReadableError::IllegalOrderedRead) => { - panic!("Illegal ordered read can only happen if `stream.read(false)` is used."); - } - }; - let mut bytes = 0; - let mut pending = false; - loop { - if buf.is_empty() { - break; - } - match chunks.next(buf.len()) { - Ok(Some(chunk)) => { - buf.write_all(&chunk.bytes).expect("enough buffer space"); - bytes += chunk.bytes.len(); - } - Ok(None) => break, - Err(ReadError::Reset(error_code)) => { - tracing::debug!("substream {} was reset with error code {}", id, error_code); - bytes = 0; - break; - } - Err(ReadError::Blocked) => { - pending = true; - break; + + match inner.connection.write_substream(*substream, buf) { + Ok(bytes) => Poll::Ready(Ok(bytes)), + Err(quinn_proto::WriteError::Stopped(err_code)) => { + Poll::Ready(Err(Error::Reset(err_code))) + }, + Err(quinn_proto::WriteError::Blocked) => { + if let Some(substream) = inner.substreams.get_mut(substream) { + if !substream + .write_waker + .as_ref() + .map_or(false, |w| w.will_wake(cx.waker())) + { + substream.write_waker = Some(cx.waker().clone()); + } } + Poll::Pending } - } - if chunks.finalize().should_transmit() { - if let Some(waker) = inner.waker.take() { - waker.wake(); + Err(quinn_proto::WriteError::UnknownStream) => { + log::error!( + "The application used a connection that is already being \ + closed. This is a bug in the application or in libp2p." + ); + Poll::Pending } } - let substream = inner.substreams.get_mut(id).expect("known substream; qed"); - if pending && bytes == 0 { - substream.read_waker = Some(cx.waker().clone()); - Poll::Pending - } else { - Poll::Ready(Ok(bytes)) - } } - fn write_substream( + fn read_substream( &self, - cx: &mut Context, - id: &mut Self::Substream, - buf: &[u8], + cx: &mut Context<'_>, + substream: &mut Self::Substream, + buf: &mut [u8], ) -> Poll> { let mut inner = self.inner.lock(); - match inner.connection.send_stream(*id).write(buf) { + + match inner.connection.read_substream(*substream, buf) { Ok(bytes) => Poll::Ready(Ok(bytes)), - Err(WriteError::Blocked) => { - let mut substream = inner.substreams.get_mut(id).expect("known substream; qed"); - substream.write_waker = Some(cx.waker().clone()); + Err(quinn_proto::ReadError::Blocked) => { + if let Some(substream) = inner.substreams.get_mut(substream) { + if !substream + .read_waker + .as_ref() + .map_or(false, |w| w.will_wake(cx.waker())) + { + substream.read_waker = Some(cx.waker().clone()); + } + } Poll::Pending } - Err(WriteError::Stopped(_)) => Poll::Ready(Ok(0)), - Err(WriteError::UnknownStream) => { - Poll::Ready(Err(QuicMuxerError::UnknownStream { id: *id })) + + Err(quinn_proto::ReadError::Reset(err_code)) => { + Poll::Ready(Err(Error::Reset(err_code))) + }, + + // `IllegalOrderedRead` happens if an unordered read followed with an ordered read are + // performed. `libp2p-quic` never does any unordered read. + Err(quinn_proto::ReadError::IllegalOrderedRead) => unreachable!(), + Err(quinn_proto::ReadError::UnknownStream) => { + log::error!( + "The application used a connection that is already being \ + closed. This is a bug in the application or in libp2p." + ); + Poll::Pending } } } fn shutdown_substream( &self, - _: &mut Context, - id: &mut Self::Substream, + cx: &mut Context<'_>, + substream: &mut Self::Substream, ) -> Poll> { - tracing::trace!("closing substream {}", id); - // closes the write end of the substream without waiting for the remote to receive the - // event. use flush substream to wait for the remote to receive the event. let mut inner = self.inner.lock(); - match inner.connection.send_stream(*id).finish() { - Ok(()) => Poll::Ready(Ok(())), - Err(FinishError::Stopped(_)) => Poll::Ready(Ok(())), - Err(FinishError::UnknownStream) => { - Poll::Ready(Err(QuicMuxerError::UnknownStream { id: *id })) - } + let inner = &mut *inner; + + let mut substream_state = inner.substreams.get_mut(substream) + .expect("invalid StreamMuxer::shutdown_substream API usage"); + if substream_state.finished { + return Poll::Ready(Ok(())) + } + + match inner.connection.shutdown_substream(*substream) { + Ok(()) => { + match substream_state.finished_waker { + Some(ref w) if w.will_wake(cx.waker()) => {}, + _ => substream_state.finished_waker = Some(cx.waker().clone()), + } + Poll::Pending + }, + Err(quinn_proto::FinishError::Stopped(err)) => Poll::Ready(Err(Error::Reset(err))), + Err(quinn_proto::FinishError::UnknownStream) => { + // Illegal usage of the API. + debug_assert!(false); + Poll::Ready(Err(Error::ExpiredStream)) + }, } } - fn destroy_substream(&self, id: Self::Substream) { - tracing::trace!("destroying substream {}", id); + fn destroy_substream(&self, substream: Self::Substream) { let mut inner = self.inner.lock(); - inner.substreams.remove(&id); - let mut stream = inner.connection.recv_stream(id); - let should_transmit = if let Ok(mut chunks) = stream.read(true) { - while let Ok(Some(_)) = chunks.next(usize::MAX) {} - chunks.finalize().should_transmit() - } else { - false - }; - if should_transmit { - if let Some(waker) = inner.waker.take() { - waker.wake(); - } - } + inner.substreams.remove(&substream); } fn flush_substream( &self, - _cx: &mut Context, - _id: &mut Self::Substream, + cx: &mut Context<'_>, + _: &mut Self::Substream, ) -> Poll> { - // quinn doesn't support flushing, calling close will flush all substreams. - Poll::Ready(Ok(())) + self.flush_all(cx) } - fn flush_all(&self, _cx: &mut Context) -> Poll> { - // quinn doesn't support flushing, calling close will flush all substreams. + // TODO: what if called multiple times? register all wakers? + fn flush_all(&self, _cx: &mut Context<'_>) -> Poll> { + // TODO: call poll_transmit() and stuff Poll::Ready(Ok(())) } - fn close(&self, cx: &mut Context) -> Poll> { - tracing::trace!("closing muxer"); - let mut inner = self.inner.lock(); - if inner.substreams.is_empty() { - return Poll::Ready(Ok(())); + // TODO: what if called multiple times? register all wakers? + fn close(&self, cx: &mut Context<'_>) -> Poll> { + // StreamMuxer's `close` documentation mentions that it automatically implies `flush_all`. + if let Poll::Pending = self.flush_all(cx)? { + return Poll::Pending; } - inner.close_waker = Some(cx.waker().clone()); - let inner = &mut *inner; - for id in inner.substreams.keys() { - let _ = inner.connection.send_stream(*id).finish(); + + // TODO: poll if closed or something + + let mut inner = self.inner.lock(); + //self.connection.close(); + + // Register `cx.waker()` as being woken up if the connection closes. + if !inner + .poll_close_waker + .as_ref() + .map_or(false, |w| w.will_wake(cx.waker())) + { + inner.poll_close_waker = Some(cx.waker().clone()); } Poll::Pending } } -#[derive(Debug, Error)] -pub enum QuicMuxerError { - #[error("connection was lost because of {reason}")] - ConnectionLost { reason: ConnectionError }, - #[error("unsupported quic feature used")] - ProtocolViolation, - #[error("stream {id} stopped with error {error_code}")] - StreamStopped { id: StreamId, error_code: VarInt }, - #[error("unknown stream {id}")] - UnknownStream { id: StreamId }, -} - -impl From for std::io::Error { - fn from(err: QuicMuxerError) -> Self { - std::io::Error::new(std::io::ErrorKind::Other, err) +impl fmt::Debug for QuicMuxer { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("QuicMuxer").finish() } } diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index c3ffd428c64..a0934418f15 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -1,4 +1,4 @@ -// Copyright 2021 David Craven. +// Copyright 2017-2020 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -18,247 +18,120 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::endpoint::{EndpointConfig, TransportChannel}; -use crate::muxer::QuicMuxer; -use crate::{QuicConfig, QuicError}; -use futures::channel::oneshot; -use futures::prelude::*; -use if_watch::{IfEvent, IfWatcher}; -use libp2p_core::multiaddr::{Multiaddr, Protocol}; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox}; -use libp2p_core::transport::{Boxed, ListenerEvent, Transport, TransportError}; -use libp2p_core::PeerId; -use parking_lot::Mutex; -use std::net::{IpAddr, SocketAddr}; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use udp_socket::SocketType; - -#[derive(Clone)] -pub struct QuicTransport { - inner: Arc>, -} - -impl QuicTransport { - /// Creates a new quic transport. - pub async fn new( - config: QuicConfig, - addr: Multiaddr, - ) -> Result> { - let socket_addr = multiaddr_to_socketaddr(&addr) - .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; - let addresses = if socket_addr.ip().is_unspecified() { - let watcher = IfWatcher::new() - .await - .map_err(|err| TransportError::Other(err.into()))?; - Addresses::Unspecified(watcher) - } else { - Addresses::Ip(Some(socket_addr.ip())) - }; - let endpoint = EndpointConfig::new(config, socket_addr).map_err(TransportError::Other)?; - Ok(Self { - inner: Arc::new(Mutex::new(QuicTransportInner { - channel: endpoint.spawn(), - addresses, - })), - }) - } - - /// Creates a boxed libp2p transport. - pub fn boxed(self) -> Boxed<(PeerId, StreamMuxerBox)> { - Transport::map(self, |(peer_id, muxer), _| { - (peer_id, StreamMuxerBox::new(muxer)) - }) - .boxed() - } -} - -impl std::fmt::Debug for QuicTransport { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("QuicTransport").finish() - } -} +//! Implementation of the [`Transport`] trait for QUIC. +//! +//! Combines all the objects in the other modules to implement the trait. -struct QuicTransportInner { - channel: TransportChannel, - addresses: Addresses, -} +use crate::{endpoint::Endpoint, muxer::QuicMuxer, upgrade::Upgrade}; -enum Addresses { - Unspecified(IfWatcher), - Ip(Option), +use futures::prelude::*; +use libp2p_core::{ + multiaddr::{Multiaddr, Protocol}, + transport::{ListenerEvent, TransportError}, + PeerId, Transport, +}; +use std::{net::SocketAddr, pin::Pin, sync::Arc}; + +// We reexport the errors that are exposed in the API. +// All of these types use one another. +pub use crate::connection::Error as Libp2pQuicConnectionError; +pub use quinn_proto::{ + ApplicationClose, ConfigError, ConnectError, ConnectionClose, ConnectionError, + TransportError as QuinnTransportError, TransportErrorCode, +}; + +/// Wraps around an `Arc` and implements the [`Transport`] trait. +/// +/// > **Note**: This type is necessary because Rust unfortunately forbids implementing the +/// > `Transport` trait directly on `Arc`. +#[derive(Debug, Clone)] +pub struct QuicTransport(pub Arc); + +/// Error that can happen on the transport. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Error while trying to reach a remote. + #[error("{0}")] + Reach(ConnectError), + /// Error after the remote has been reached. + #[error("{0}")] + Established(Libp2pQuicConnectionError), } impl Transport for QuicTransport { type Output = (PeerId, QuicMuxer); - type Error = QuicError; - type Listener = Self; - type ListenerUpgrade = QuicUpgrade; - type Dial = QuicDial; + type Error = Error; + type Listener = Pin< + Box, Self::Error>> + Send>, + >; + type ListenerUpgrade = Upgrade; + type Dial = Pin> + Send>>; fn listen_on(self, addr: Multiaddr) -> Result> { - multiaddr_to_socketaddr(&addr) - .ok_or_else(|| TransportError::MultiaddrNotSupported(addr))?; - Ok(self) - } - - fn dial(self, addr: Multiaddr) -> Result> { - let socket_addr = multiaddr_to_socketaddr(&addr).ok_or_else(|| { - tracing::debug!("invalid multiaddr"); - TransportError::MultiaddrNotSupported(addr.clone()) - })?; - if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { - tracing::debug!("invalid multiaddr"); - return Err(TransportError::MultiaddrNotSupported(addr)); - } - tracing::debug!("dialing {}", socket_addr); - let rx = self.inner.lock().channel.dial(socket_addr); - Ok(QuicDial::Dialing(rx)) - } - - fn address_translation(&self, _listen: &Multiaddr, observed: &Multiaddr) -> Option { - Some(observed.clone()) - } -} - -impl Stream for QuicTransport { - type Item = Result, QuicError>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let mut inner = self.inner.lock(); - match &mut inner.addresses { - Addresses::Ip(ip) => { - if let Some(ip) = ip.take() { - let addr = socketaddr_to_multiaddr(&SocketAddr::new(ip, inner.channel.port())); - return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(addr)))); - } - } - Addresses::Unspecified(watcher) => match Pin::new(watcher).poll(cx) { - Poll::Ready(Ok(IfEvent::Up(net))) => { - if inner.channel.ty() == SocketType::Ipv4 && net.addr().is_ipv4() - || inner.channel.ty() != SocketType::Ipv4 && net.addr().is_ipv6() - { - let addr = socketaddr_to_multiaddr(&SocketAddr::new( - net.addr(), - inner.channel.port(), - )); - return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(addr)))); - } - } - Poll::Ready(Ok(IfEvent::Down(net))) => { - if inner.channel.ty() == SocketType::Ipv4 && net.addr().is_ipv4() - || inner.channel.ty() != SocketType::Ipv4 && net.addr().is_ipv6() - { - let addr = socketaddr_to_multiaddr(&SocketAddr::new( - net.addr(), - inner.channel.port(), - )); - return Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(addr)))); - } - } - Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err.into()))), - Poll::Pending => {} - }, - } - match inner.channel.poll_incoming(cx) { - Poll::Ready(Some(Ok(muxer))) => Poll::Ready(Some(Ok(ListenerEvent::Upgrade { - local_addr: muxer.local_addr(), - remote_addr: muxer.remote_addr(), - upgrade: QuicUpgrade::new(muxer), - }))), - Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -} - -#[allow(clippy::large_enum_variant)] -pub enum QuicDial { - Dialing(oneshot::Receiver>), - Upgrade(QuicUpgrade), -} - -impl Future for QuicDial { - type Output = Result<(PeerId, QuicMuxer), QuicError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - loop { - match &mut *self { - Self::Dialing(rx) => match Pin::new(rx).poll(cx) { - Poll::Ready(Ok(Ok(muxer))) => { - *self = Self::Upgrade(QuicUpgrade::new(muxer)); - } - Poll::Ready(Ok(Err(err))) => return Poll::Ready(Err(err)), - Poll::Ready(Err(_)) => panic!("endpoint crashed"), - Poll::Pending => return Poll::Pending, - }, - Self::Upgrade(upgrade) => return Pin::new(upgrade).poll(cx), + // TODO: check address correctness + + // TODO: report the locally opened addresses + + Ok(stream::unfold((), move |()| { + let endpoint = self.0.clone(); + let addr = addr.clone(); + async move { + let connec = endpoint.next_incoming().await; + let remote_addr = socketaddr_to_multiaddr(&connec.remote_addr()); + let event = Ok(ListenerEvent::Upgrade { + upgrade: Upgrade::from_connection(connec), + local_addr: addr.clone(), // TODO: hack + remote_addr, + }); + Some((event, ())) } - } + }) + .boxed()) } -} - -pub struct QuicUpgrade { - muxer: Option, -} - -impl QuicUpgrade { - fn new(muxer: QuicMuxer) -> Self { - Self { muxer: Some(muxer) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + panic!("not implemented") } -} - -impl Future for QuicUpgrade { - type Output = Result<(PeerId, QuicMuxer), QuicError>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let inner = Pin::into_inner(self); - let muxer = inner.muxer.as_mut().expect("future polled after ready"); - match muxer.poll_event(cx) { - Poll::Pending => { - if let Some(peer_id) = muxer.peer_id() { - muxer.set_accept_incoming(true); - Poll::Ready(Ok(( - peer_id, - inner.muxer.take().expect("future polled after ready"), - ))) - } else { - Poll::Pending - } - } - Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), - Poll::Ready(Ok(_)) => { - unreachable!("muxer.incoming is set to false so no events can be produced"); + fn dial(self, addr: Multiaddr) -> Result> { + let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { + if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { + return Err(TransportError::MultiaddrNotSupported(addr)); } + socket_addr + } else { + return Err(TransportError::MultiaddrNotSupported(addr)); + }; + + Ok(async move { + let connection = self.0.dial(socket_addr).await.map_err(Error::Reach)?; + let final_connec = Upgrade::from_connection(connection).await?; + Ok(final_connec) } + .boxed()) } } -/// Tries to turn a QUIC multiaddress into a UDP [`SocketAddr`]. Returns None if the format +/// Tries to turn a QUIC multiaddress into a UDP [`SocketAddr`]. Returns an error if the format /// of the multiaddr is wrong. -fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Option { +pub(crate) fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { let mut iter = addr.iter(); - let proto1 = iter.next()?; - let proto2 = iter.next()?; - let proto3 = iter.next()?; + let proto1 = iter.next().ok_or(())?; + let proto2 = iter.next().ok_or(())?; + let proto3 = iter.next().ok_or(())?; - while let Some(proto) = iter.next() { - match proto { - Protocol::P2p(_) => {} // Ignore a `/p2p/...` prefix of possibly outer protocols, if present. - _ => return None, - } + if iter.next().is_some() { + return Err(()); } match (proto1, proto2, proto3) { (Protocol::Ip4(ip), Protocol::Udp(port), Protocol::Quic) => { - Some(SocketAddr::new(ip.into(), port)) + Ok(SocketAddr::new(ip.into(), port)) } (Protocol::Ip6(ip), Protocol::Udp(port), Protocol::Quic) => { - Some(SocketAddr::new(ip.into(), port)) + Ok(SocketAddr::new(ip.into(), port)) } - _ => None, + _ => Err(()), } } @@ -271,67 +144,54 @@ pub(crate) fn socketaddr_to_multiaddr(socket_addr: &SocketAddr) -> Multiaddr { } #[cfg(test)] -mod tests { - use super::*; - - #[test] - fn multiaddr_to_socketaddr_conversion() { - use std::net::{Ipv4Addr, Ipv6Addr}; +#[test] +fn multiaddr_to_udp_conversion() { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; - assert!( - multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()) - .is_none() - ); + assert!( + multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()).is_err() + ); - assert_eq!( - multiaddr_to_socketaddr( - &"/ip4/127.0.0.1/udp/12345/quic" - .parse::() - .unwrap() - ), - Some(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 12345, - )) - ); - - assert!(multiaddr_to_socketaddr( - &"/ip4/127.0.0.1/udp/12345/quic/tcp/12345" + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/127.0.0.1/udp/12345/quic" .parse::() .unwrap() - ) - .is_none()); - - assert_eq!( - multiaddr_to_socketaddr( - &"/ip4/255.255.255.255/udp/8080/quic" - .parse::() - .unwrap() - ), - Some(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), - 8080, - )) - ); - assert_eq!( - multiaddr_to_socketaddr(&"/ip6/::1/udp/12345/quic".parse::().unwrap()), - Some(SocketAddr::new( - IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), - 12345, - )) - ); - assert_eq!( - multiaddr_to_socketaddr( - &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/udp/8080/quic" - .parse::() - .unwrap() - ), - Some(SocketAddr::new( - IpAddr::V6(Ipv6Addr::new( - 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535, - )), - 8080, - )) - ); - } + ), + Ok(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 12345, + )) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/255.255.255.255/udp/8080/quic" + .parse::() + .unwrap() + ), + Ok(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), + 8080, + )) + ); + assert_eq!( + multiaddr_to_socketaddr(&"/ip6/::1/udp/12345/quic".parse::().unwrap()), + Ok(SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), + 12345, + )) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/udp/8080/quic" + .parse::() + .unwrap() + ), + Ok(SocketAddr::new( + IpAddr::V6(Ipv6Addr::new( + 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535, + )), + 8080, + )) + ); } diff --git a/transports/quic/src/upgrade.rs b/transports/quic/src/upgrade.rs new file mode 100644 index 00000000000..71f2d00b285 --- /dev/null +++ b/transports/quic/src/upgrade.rs @@ -0,0 +1,87 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Future that drives a QUIC connection until is has performed its TLS handshake. + +use crate::{ + connection::{Connection, ConnectionEvent}, + muxer::QuicMuxer, + transport, x509, +}; + +use futures::prelude::*; +use libp2p_core::PeerId; +use std::{ + fmt, + pin::Pin, + task::{Context, Poll}, +}; + +/// A QUIC connection currently being negotiated. +pub struct Upgrade { + connection: Option, +} + +impl Upgrade { + /// Builds an [`Upgrade`] that wraps around a [`Connection`]. + pub(crate) fn from_connection(connection: Connection) -> Self { + Upgrade { + connection: Some(connection), + } + } +} + +impl Future for Upgrade { + type Output = Result<(PeerId, QuicMuxer), transport::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let connection = match self.connection.as_mut() { + Some(c) => c, + None => panic!("Future polled after it has ended"), + }; + + loop { + if let Some(mut certificates) = connection.peer_certificates() { + let peer_id = x509::extract_peerid_or_panic(certificates.next().unwrap().as_der()); // TODO: bad API + let muxer = QuicMuxer::from_connection(self.connection.take().unwrap()); + return Poll::Ready(Ok((peer_id, muxer))); + } + + match Connection::poll_event(connection, cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(ConnectionEvent::Connected) => { + // `is_handshaking()` will return `false` at the next loop iteration. + continue; + } + Poll::Ready(ConnectionEvent::ConnectionLost(err)) => { + return Poll::Ready(Err(transport::Error::Established(err))); + } + // TODO: enumerate the items and explain how they can't happen + _ => unreachable!(), + } + } + } +} + +impl fmt::Debug for Upgrade { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.connection, f) + } +}