Skip to content

Commit

Permalink
HTTP/3 support. Uses the h3 crate. Quinn for the QUIC stack.
Browse files Browse the repository at this point in the history
- Adds alt-svc headers to responses
- Removed Async{Read, Write} for ResponseBodyPipe,
  since they are inefficient for HTTP/2&/3 implementations.
- Redesigned reverse proxy & WebSocket.
- send_response, etc, all take ownership. This is elegant.
- WebSocket fails for HTTP/2. Before it silently errored.

TODO:
- WebSocket / WebTransport? support for HTTP/2 & HTTP/3
- io_uring support for HTTP/3
  • Loading branch information
Icelk committed Jul 19, 2023
1 parent 09c5d25 commit 541d7bb
Show file tree
Hide file tree
Showing 11 changed files with 733 additions and 385 deletions.
11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ bytes = "1"
compact_str = "0.7.0"
log = "0.4"
time = { version = "0.3", features = ["parsing", "formatting", "macros"] }
socket2 = { version = "0.5.3", optional = true, features = ["all"] }

h2 = { version = "0.3.17", default-features = false, optional = true }
http = "0.2"
Expand Down Expand Up @@ -74,6 +75,11 @@ tokio-tungstenite = { version = "0.19", optional = true, default-features = fals
sha-1 = { version = "0.10", optional = true }
futures-util = { version = "0.3", optional = true, default-features = false, features = ["sink"] }

# HTTP/3
h3 = { version = "0.0.2", optional = true }
h3-quinn = { version = "0.0.3", optional = true }
quinn = { version = "0.10.1", default-features = false, features = ["tls-rustls", "log", "runtime-tokio"], optional = true }

[target.'cfg(unix)'.dependencies]
libc = { version = "0.2", default-features = false }

Expand All @@ -92,9 +98,10 @@ br = ["brotli"]
gzip = ["flate2"]

# HTTP standards
all-http = ["https", "http2"]
all-http = ["https", "http2", "http3"]
https = ["rustls", "rustls-pemfile", "webpki", "async-networking"]
http2 = ["h2", "https"]
http3 = ["h3", "h3-quinn", "quinn", "https"]

# Graceful shutdown; shutdown.rs
graceful-shutdown = ["handover"]
Expand All @@ -111,7 +118,7 @@ nonce = ["rand", "base64", "memchr"]
websocket = ["tokio-tungstenite", "sha-1", "base64", "futures-util"]

# Use tokio's async networking instead of the blocking variant.
async-networking = ["tokio/net"]
async-networking = ["tokio/net", "socket2"]

uring = ["tokio-uring", "kvarn_signal/uring", "async-networking"]

Expand Down
12 changes: 6 additions & 6 deletions extensions/src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn mount(extensions: &mut Extensions, manager: SmartPush) -> &mut Extensions
pub fn always<'a>(
request: &'a FatRequest,
host: &'a Host,
response_pipe: &'a mut application::ResponsePipe,
response_pipe: &'a mut application::ResponseBodyPipe,
bytes: Bytes,
addr: SocketAddr,
) -> RetFut<'a, ()> {
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Default for SmartPush {
async fn push<'a>(
request: &'a FatRequest,
host: &'a Host,
response_pipe: &'a mut application::ResponsePipe,
response_pipe: &'a mut application::ResponseBodyPipe,
bytes: Bytes,
addr: SocketAddr,
manager: Option<&'a Mutex<SmartPush>>,
Expand All @@ -113,9 +113,9 @@ async fn push<'a>(
// let request = unsafe { request.get_inner() };
// let response_pipe = unsafe { response_pipe.get_inner() };

// If it is not HTTP/1
// If it is not HTTP/2
#[allow(irrefutable_let_patterns)]
if let ResponsePipe::Http1(_) = &response_pipe {
if !matches!(response_pipe, ResponseBodyPipe::Http2(_, _)) {
return;
}

Expand Down Expand Up @@ -211,7 +211,7 @@ async fn push<'a>(

let empty_request = utils::empty_clone_request(&push_request);

let mut response_pipe = match response_pipe.push_request(empty_request) {
let response_pipe = match response_pipe.push_request(empty_request) {
Ok(pipe) => pipe,
Err(_) => return,
};
Expand All @@ -221,7 +221,7 @@ async fn push<'a>(

let response = kvarn::handle_cache(&mut push_request, addr, host).await;

if let Err(err) = kvarn::SendKind::Push(&mut response_pipe)
if let Err(err) = kvarn::SendKind::Push(response_pipe)
.send(response, request, host, addr)
.await
{
Expand Down
101 changes: 56 additions & 45 deletions extensions/src/reverse-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,6 @@ pub mod async_bits {
}
};
}
macro_rules! ret_ready_err {
($poll: expr) => {
match $poll {
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Ready(r) => Poll::Ready(r),
_ => $poll,
}
};
($poll: expr, $map: expr) => {
match $poll {
Poll::Ready(Err(e)) => return Poll::Ready(Err($map(e))),
Poll::Ready(r) => Poll::Ready(r),
_ => Poll::Pending,
}
};
}

#[derive(Debug)]
pub struct CopyBuffer {
Expand Down Expand Up @@ -251,45 +235,72 @@ impl OpenBackError {
}
}
}
pub struct ByteProxy<'a, F: AsyncRead + AsyncWrite + Unpin, B: AsyncRead + AsyncWrite + Unpin> {
front: &'a mut F,
pub struct ByteProxy<'a, B: AsyncRead + AsyncWrite + Unpin> {
front: &'a mut ResponseBodyPipe,
back: &'a mut B,
// ToDo: Optimize to one buffer!
front_buf: CopyBuffer,
back_buf: CopyBuffer,
buf: Vec<u8>,
}
impl<'a, F: AsyncRead + AsyncWrite + Unpin, B: AsyncRead + AsyncWrite + Unpin> ByteProxy<'a, F, B> {
pub fn new(front: &'a mut F, back: &'a mut B) -> Self {
impl<'a, B: AsyncRead + AsyncWrite + Unpin> ByteProxy<'a, B> {
pub fn new(front: &'a mut ResponseBodyPipe, back: &'a mut B) -> Self {
Self {
front,
back,
front_buf: CopyBuffer::new(),
back_buf: CopyBuffer::new(),
buf: Vec::with_capacity(16 * 1024),
}
}
pub fn poll_channel(&mut self, cx: &mut Context) -> Poll<Result<(), OpenBackError>> {
macro_rules! copy_from_to {
($reader: expr, $error: expr, $buf: expr, $writer: expr) => {
if let Poll::Ready(Ok(pipe_closed)) = ret_ready_err!(
$buf.poll_copy(cx, Pin::new($reader), Pin::new($writer)),
$error
) {
if pipe_closed {
return Poll::Ready(Err(OpenBackError::Closed));
} else {
return Poll::Ready(Ok(()));
pub async fn channel(&mut self) -> Result<(), OpenBackError> {
let mut front_done = false;
let mut back_done = false;
loop {
if !front_done {
if let ResponseBodyPipe::Http1(h1) = self.front {
{
unsafe { self.buf.set_len(self.buf.capacity()) };
let read = h1
.lock()
.await
.read(&mut self.buf)
.await
.map_err(OpenBackError::Front)?;
if read == 0 {
front_done = true;
}
unsafe { self.buf.set_len(read) };
}
};
};
}

copy_from_to!(self.back, OpenBackError::Back, self.front_buf, self.front);
copy_from_to!(self.front, OpenBackError::Front, self.back_buf, self.back);
self.back
.write_all(&self.buf)
.await
.map_err(OpenBackError::Back)?;
} else {
front_done = true;
}
}
if !back_done {
{
unsafe { self.buf.set_len(self.buf.capacity()) };
let read = self
.back
.read(&mut self.buf)
.await
.map_err(OpenBackError::Back)?;
if read == 0 {
back_done = true;
}
unsafe { self.buf.set_len(read) };
}
self.front
.send(Bytes::copy_from_slice(&self.buf))
.await
.map_err(io::Error::from)
.map_err(OpenBackError::Front)?;
}

Poll::Pending
}
pub async fn channel(&mut self) -> Result<(), OpenBackError> {
futures_util::future::poll_fn(|cx| self.poll_channel(cx)).await
if front_done && back_done {
break;
}
}
Ok(())
}
}

Expand Down
27 changes: 10 additions & 17 deletions roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,26 @@ Info on changes in older versions are available at the [changelog](CHANGELOG.md)
> The work will be taking place in branches, named after the target release. The order of these feature releases are not set in stone;
> the features of 0.7.0 might come out as version 0.6.0
# v0.6.0 HTTP/3
# v0.6.0 edgebleed

This is where Kvarn turns into a cutting-edge web server.

> Kvarn already has a good flexible design, so adding this is largely making
> a glue-crate to make HTTP/3 accessible like HTTP/2 is in the `h2` crate.
## v0.8.0 io_uring

Use Linux's new `io_uring` interface for handling networking and IO on Linux.
This should improve performance and power efficiency. This is merged into v0.6.0.

## To do

_Well..._

- [ ] HTTP/3 crate
- [ ] HTTP/3 support in Kvarn
- [ ] cfg to disable new feature
- [x] HTTP/3 support in Kvarn
- [x] cfg to disable new feature
- [x] io_uring support
- [ ] io_uring support for HTTP/3

# v0.7.0 DynLan

Expand All @@ -43,16 +49,3 @@ Another challenge is isolating requests while using one VM.
- [ ] cfg
- [ ] PHP bindings
- [ ] PHP crate

# v0.8.0 io_uring

Use Linux's new `io_uring` interface for handling networking and IO on Linux.
This should improve performance and power efficiency.

## To do

- [ ] Wait for [`tokio-uring`](https://docs.rs/tokio-uring) to add multithreading support
- [ ] Or support an entirely different runtime (e.g. [`monoio`](https://github.com/bytedance/monoio)
(it shouldn't be an issue that it's developed by ByteDance? Are be being tracked?))
- [ ] Investigate compatibility issues with ecosystem. Actual implementation should be fine
(the `net` feature in `tokio` is already optional)
Loading

0 comments on commit 541d7bb

Please sign in to comment.