Skip to content

Commit

Permalink
Decouple incoming streams of differing directionalities
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith authored and djc committed Sep 29, 2019
1 parent 8e1c6b6 commit 2dcb084
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 178 deletions.
18 changes: 2 additions & 16 deletions quinn-h3/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use futures::{ready, stream::Stream, Future, Poll};
use http::{request, HeaderMap, Request, Response};
use quinn::{Endpoint, OpenBi};
use quinn_proto::StreamId;
use slog::{self, o, Logger};

use crate::{
body::{Body, BodyWriter, RecvBody},
Expand All @@ -23,23 +22,16 @@ use crate::{

#[derive(Clone, Debug, Default)]
pub struct Builder {
log: Option<Logger>,
settings: Settings,
}

impl Builder {
pub fn new() -> Self {
Self {
log: None,
settings: Settings::default(),
}
}

pub fn logger(&mut self, log: Logger) -> &mut Self {
self.log = Some(log);
self
}

pub fn settings(&mut self, settings: Settings) -> &mut Self {
self.settings = settings;
self
Expand All @@ -49,16 +41,12 @@ impl Builder {
Client {
endpoint: endpoint,
settings: self.settings,
log: self
.log
.unwrap_or_else(|| Logger::root(slog::Discard, o!())),
}
}
}

pub struct Client {
endpoint: Endpoint,
log: Logger,
settings: Settings,
}

Expand All @@ -69,7 +57,6 @@ impl Client {
server_name: &str,
) -> Result<Connecting, quinn::ConnectError> {
Ok(Connecting {
log: self.log.clone(),
settings: self.settings.clone(),
connecting: self.endpoint.connect(addr, server_name)?,
})
Expand All @@ -96,7 +83,6 @@ impl Connection {

pub struct Connecting {
connecting: quinn::Connecting,
log: Logger,
settings: Settings,
}

Expand All @@ -107,13 +93,13 @@ impl Future for Connecting {
let quinn::NewConnection {
driver,
connection,
streams,
bi_streams,
..
} = ready!(Pin::new(&mut self.connecting).poll(cx))?;
let conn_ref = ConnectionRef::new(connection, self.settings.clone())?;
Poll::Ready(Ok((
driver,
ConnectionDriver::new(conn_ref.clone(), streams, self.log.clone()),
ConnectionDriver::new(conn_ref.clone(), bi_streams),
Connection(conn_ref),
)))
}
Expand Down
19 changes: 5 additions & 14 deletions quinn-h3/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use std::sync::{Arc, Mutex};
use std::task::{Context, Waker};

use futures::{Future, Poll, Stream};
use quinn::{NewStream, RecvStream, SendStream};
use slog::{info, Logger};
use quinn::{RecvStream, SendStream};

use crate::{
proto::connection::{Connection, Error as ProtoError},
Expand All @@ -14,17 +13,12 @@ use crate::{

pub struct ConnectionDriver {
conn: ConnectionRef,
incoming: quinn::IncomingStreams,
log: Logger,
incoming: quinn::IncomingBiStreams,
}

impl ConnectionDriver {
pub(crate) fn new(conn: ConnectionRef, incoming: quinn::IncomingStreams, log: Logger) -> Self {
Self {
conn,
incoming,
log,
}
pub(crate) fn new(conn: ConnectionRef, incoming: quinn::IncomingBiStreams) -> Self {
Self { conn, incoming }
}
}

Expand All @@ -34,10 +28,7 @@ impl Future for ConnectionDriver {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Pin::new(&mut self.incoming).poll_next(cx)? {
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Ready(Some(NewStream::Uni(_recv))) => {
info!(self.log, "incoming uni stream ignored");
}
Poll::Ready(Some(NewStream::Bi(send, recv))) => {
Poll::Ready(Some((send, recv))) => {
let mut conn = self.conn.h3.lock().unwrap();
conn.requests.push_back((send, recv));
if let Some(t) = conn.requests_task.take() {
Expand Down
18 changes: 2 additions & 16 deletions quinn-h3/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use futures::{ready, Future, Poll, Stream};
use http::{response, HeaderMap, Request, Response};
use quinn::{EndpointBuilder, EndpointDriver, EndpointError, RecvStream, SendStream};
use quinn_proto::StreamId;
use slog::{self, o, Logger};

use crate::{
body::{Body, BodyWriter, RecvBody},
Expand All @@ -23,24 +22,17 @@ use crate::{

pub struct Builder {
endpoint: EndpointBuilder,
log: Option<Logger>,
settings: Settings,
}

impl Builder {
pub fn new(endpoint: EndpointBuilder) -> Self {
Self {
endpoint,
log: None,
settings: Settings::default(),
}
}

pub fn logger(&mut self, log: Logger) -> &mut Self {
self.log = Some(log);
self
}

pub fn settings(&mut self, settings: Settings) -> &mut Self {
self.settings = settings;
self
Expand All @@ -57,9 +49,6 @@ impl Builder {
IncomingConnection {
incoming,
settings: self.settings.clone(),
log: self
.log
.unwrap_or_else(|| Logger::root(slog::Discard, o!())),
},
))
}
Expand All @@ -68,7 +57,6 @@ impl Builder {
pub struct Server;

pub struct IncomingConnection {
log: Logger,
incoming: quinn::Incoming,
settings: Settings,
}
Expand All @@ -80,7 +68,6 @@ impl Stream for IncomingConnection {
Poll::Ready(
ready!(Pin::new(&mut self.incoming).poll_next(cx)).map(|c| Connecting {
connecting: c,
log: self.log.clone(),
settings: self.settings.clone(),
}),
)
Expand All @@ -89,7 +76,6 @@ impl Stream for IncomingConnection {

pub struct Connecting {
connecting: quinn::Connecting,
log: Logger,
settings: Settings,
}

Expand All @@ -100,13 +86,13 @@ impl Future for Connecting {
let quinn::NewConnection {
driver,
connection,
streams,
bi_streams,
..
} = ready!(Pin::new(&mut self.connecting).poll(cx))?;
let conn_ref = ConnectionRef::new(connection, self.settings.clone())?;
Poll::Ready(Ok((
driver,
ConnectionDriver::new(conn_ref.clone(), streams, self.log.clone()),
ConnectionDriver::new(conn_ref.clone(), bi_streams),
IncomingRequest(conn_ref),
)))
}
Expand Down
32 changes: 17 additions & 15 deletions quinn-proto/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ where
prev_crypto: Option<PrevCrypto<S::Keys>>,
/// Latest PATH_CHALLENGE token issued to the peer along the current path
path_challenge: Option<u64>,
/// Whether the remote endpoint has opened any streams the application doesn't know about yet
stream_opened: bool,
/// Whether the remote endpoint has opened any streams the application doesn't know about yet,
/// per directionality
stream_opened: [bool; 2],
accepted_0rtt: bool,
/// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
permit_idle_reset: bool,
Expand Down Expand Up @@ -229,7 +230,7 @@ where
highest_space: SpaceId::Initial,
prev_crypto: None,
path_challenge: None,
stream_opened: false,
stream_opened: [false, false],
accepted_0rtt: false,
permit_idle_reset: true,
idle_timeout: config.idle_timeout,
Expand Down Expand Up @@ -293,8 +294,9 @@ where
/// - an incoming packet is handled, or
/// - the idle timer expires
pub fn poll(&mut self) -> Option<Event> {
if mem::replace(&mut self.stream_opened, false) {
return Some(Event::StreamOpened);
for dir in Dir::iter().filter(|&i| mem::replace(&mut self.stream_opened[i as usize], false))
{
return Some(Event::StreamOpened { dir });
}

if let Some(x) = self.events.pop_front() {
Expand Down Expand Up @@ -1983,7 +1985,7 @@ where
let next = &mut self.streams.next_remote[stream.dir() as usize];
if stream.index() >= *next {
*next = stream.index() + 1;
self.stream_opened = true;
self.stream_opened[stream.dir() as usize] = true;
} else if notify_readable {
self.events.push_back(Event::StreamReadable { stream });
}
Expand Down Expand Up @@ -2678,14 +2680,11 @@ where
self.streams.alloc_remote_stream(self.side, dir);
}

/// Accept a remotely initiated stream if possible
/// Accept a remotely initiated stream of a certain directionality, if possible
///
/// Returns `None` if there are no new incoming streams for this connection.
pub fn accept(&mut self) -> Option<StreamId> {
let id = self
.streams
.accept(self.side, Dir::Uni)
.or_else(|| self.streams.accept(self.side, Dir::Bi))?;
pub fn accept(&mut self, dir: Dir) -> Option<StreamId> {
let id = self.streams.accept(self.side, dir)?;
self.alloc_remote_stream(id.dir());
Some(id)
}
Expand Down Expand Up @@ -3408,8 +3407,11 @@ pub enum Event {
/// Reason that the connection was closed
reason: ConnectionError,
},
/// One or more new streams has been opened and is readable
StreamOpened,
/// One or more new streams has been opened
StreamOpened {
/// Directionality for which streams have been opened
dir: Dir,
},
/// An existing stream has data or errors waiting to be read
StreamReadable {
/// Which stream is now readable
Expand All @@ -3429,7 +3431,7 @@ pub enum Event {
},
/// At least one new stream of a certain directionality may be opened
StreamAvailable {
/// On which direction streams are newly available
/// Directionality for which streams are newly available
dir: Dir,
},
}
Expand Down
Loading

0 comments on commit 2dcb084

Please sign in to comment.