From 3f0fbda22c228a99203ba91f18ffdc96b3319692 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 14 Sep 2021 16:50:41 +0100 Subject: [PATCH 01/13] Add a feature and update hyper_example to make it easier to use Soketto with http types --- .github/workflows/ci.yml | 4 +- Cargo.toml | 6 +++ examples/hyper_server.rs | 109 ++++++++++--------------------------- src/handshake.rs | 16 ++++++ src/handshake/http.rs | 114 +++++++++++++++++++++++++++++++++++++++ src/handshake/server.rs | 29 ++++++---- 6 files changed, 186 insertions(+), 92 deletions(-) create mode 100644 src/handshake/http.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 85722040..334fc01b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,7 +2,7 @@ name: Rust on: push: - # Run jobs when commits are pushed to + # Run jobs when commits are pushed to # develop or release-like branches: branches: - develop @@ -40,7 +40,7 @@ jobs: uses: actions-rs/cargo@v1.0.3 with: command: check - args: --all-targets + args: --all-targets --all-features fmt: name: Run rustfmt diff --git a/Cargo.toml b/Cargo.toml index ab15c2f1..b8bab2c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ httparse = { default-features = false, features = ["std"], version = "1.3.4" } log = { default-features = false, version = "0.4.8" } rand = { default-features = false, features = ["std", "std_rng"], version = "0.8" } sha-1 = { default-features = false, version = "0.9" } +http = { default-features = false, version = "0.2", optional = true } [dev-dependencies] quickcheck = "0.9" @@ -33,3 +34,8 @@ tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.6", features = ["compat"] } tokio-stream = { version = "0.1", features = ["net"] } hyper = { version = "0.14.10", features = ["full"] } + +[[example]] +name = "hyper_server" +required-features = ["http"] + diff --git a/examples/hyper_server.rs b/examples/hyper_server.rs index 25ebabeb..af8c47c2 100644 --- a/examples/hyper_server.rs +++ b/examples/hyper_server.rs @@ -23,7 +23,14 @@ use futures::io::{BufReader, BufWriter}; use hyper::{Body, Request, Response}; -use soketto::{handshake, BoxedError}; +use soketto::{ + handshake::{ + self, + http::{upgrade_request, UpgradeError, Upgraded}, + server, + }, + BoxedError, +}; use tokio_util::compat::TokioAsyncReadCompatExt; /// Start up a hyper server. @@ -43,50 +50,33 @@ async fn main() -> Result<(), BoxedError> { /// Handle incoming HTTP Requests. async fn handler(req: Request) -> Result, BoxedError> { - // If the request is asking to be upgraded to a websocket connection, we do that, - // and we handle the websocket connection (in this case, by echoing messages back): - if is_upgrade_request(&req) { - let (res, on_upgrade) = upgrade_to_websocket(req)?; - tokio::spawn(async move { - if let Err(e) = websocket_echo_messages(on_upgrade).await { - eprintln!("Error upgrading to websocket connection: {}", e); - } - }); - Ok(res) - } - // Or, we can handle the request as a standard HTTP request: - else { - Ok(Response::new(Body::from("Hello HTTP!"))) - } -} - -/// Return the response to the upgrade request, and a way to get hold of the underlying TCP stream -fn upgrade_to_websocket(req: Request) -> Result<(Response, hyper::upgrade::OnUpgrade), handshake::Error> { - let key = req.headers().get("Sec-WebSocket-Key").ok_or(handshake::Error::InvalidSecWebSocketAccept)?; - if req.headers().get("Sec-WebSocket-Version").map(|v| v.as_bytes()) != Some(b"13") { - return Err(handshake::Error::HeaderNotFound("Sec-WebSocket-Version".into())); + match upgrade_request(&req) { + // The upgrade was successful, so we return the response and start up a server to take charge of the socket: + Ok(Upgraded { response, server_configuration }) => { + tokio::spawn(async move { + let on_upgrade = hyper::upgrade::on(req); + if let Err(e) = websocket_echo_messages(on_upgrade, server_configuration).await { + eprintln!("Error upgrading to websocket connection: {}", e); + } + }); + Ok(response.map(|()| Body::empty())) + } + // We tried to upgrade and failed; tell the client about the failure however we like: + Err(UpgradeError::HandshakeError(_e)) => Ok(Response::new(Body::from("Something went wrong upgrading!"))), + // The request wasn't an upgrade request; let's treat it as a standard HTTP request: + Err(UpgradeError::NotAnUpgradeRequest) => Ok(Response::new(Body::from("Hello HTTP!"))), } - - // Just a little ceremony we need to go through to return the correct response key: - let mut accept_key_buf = [0; 32]; - let accept_key = generate_websocket_accept_key(key.as_bytes(), &mut accept_key_buf); - - let response = Response::builder() - .status(hyper::StatusCode::SWITCHING_PROTOCOLS) - .header(hyper::header::CONNECTION, "upgrade") - .header(hyper::header::UPGRADE, "websocket") - .header("Sec-WebSocket-Accept", accept_key) - .body(Body::empty()) - .expect("bug: failed to build response"); - - Ok((response, hyper::upgrade::on(req))) } /// Echo any messages we get from the client back to them -async fn websocket_echo_messages(on_upgrade: hyper::upgrade::OnUpgrade) -> Result<(), BoxedError> { +async fn websocket_echo_messages( + on_upgrade: hyper::upgrade::OnUpgrade, + server_configuration: server::ServerConfiguration, +) -> Result<(), BoxedError> { // Wait for the request to upgrade, and pass the stream we get back to Soketto to handle the WS connection: let stream = on_upgrade.await?; - let server = handshake::Server::new(BufReader::new(BufWriter::new(stream.compat()))); + let mut server = handshake::Server::new(BufReader::new(BufWriter::new(stream.compat()))); + server.configure(server_configuration)?; let (mut sender, mut receiver) = server.into_builder().finish(); // Echo any received messages back to the client: @@ -118,44 +108,3 @@ async fn websocket_echo_messages(on_upgrade: hyper::upgrade::OnUpgrade) -> Resul Ok(()) } - -/// Defined in RFC 6455. this is how we convert the Sec-WebSocket-Key in a request into a -/// Sec-WebSocket-Accept that we return in the response. -fn generate_websocket_accept_key<'a>(key: &[u8], buf: &'a mut [u8; 32]) -> &'a [u8] { - // Defined in RFC 6455, we append this to the key to generate the response: - const KEY: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; - - use sha1::{Digest, Sha1}; - let mut digest = Sha1::new(); - digest.update(key); - digest.update(KEY); - let d = digest.finalize(); - - let n = base64::encode_config_slice(&d, base64::STANDARD, buf); - &buf[..n] -} - -/// Check if a request is a websocket upgrade request. -pub fn is_upgrade_request(request: &hyper::Request) -> bool { - header_contains_value(request.headers(), hyper::header::CONNECTION, b"upgrade") - && header_contains_value(request.headers(), hyper::header::UPGRADE, b"websocket") -} - -/// Check if there is a header of the given name containing the wanted value. -fn header_contains_value(headers: &hyper::HeaderMap, header: hyper::header::HeaderName, value: &[u8]) -> bool { - pub fn trim(x: &[u8]) -> &[u8] { - let from = match x.iter().position(|x| !x.is_ascii_whitespace()) { - Some(i) => i, - None => return &[], - }; - let to = x.iter().rposition(|x| !x.is_ascii_whitespace()).unwrap(); - &x[from..=to] - } - - for header in headers.get_all(header) { - if header.as_bytes().split(|&c| c == b',').any(|x| trim(x).eq_ignore_ascii_case(value)) { - return true; - } - } - false -} diff --git a/src/handshake.rs b/src/handshake.rs index 433430a2..1eee52b1 100644 --- a/src/handshake.rs +++ b/src/handshake.rs @@ -11,10 +11,13 @@ //! [handshake]: https://tools.ietf.org/html/rfc6455#section-4 pub mod client; +#[cfg(feature = "http")] +pub mod http; pub mod server; use crate::extension::{Extension, Param}; use bytes::BytesMut; +use sha1::{Digest, Sha1}; use std::{fmt, io, str}; pub use client::{Client, ServerResponse}; @@ -121,6 +124,19 @@ where } } +/// This function takes a 16 byte key (base64 encoded, and so 24 bytes of input) that is expected via +/// the `Sec-WebSocket-Key` header during a websocket handshake, and a 32 byte output buffer, and +/// writes the response that's expected to be handed back in the response header `Sec-WebSocket-Accept`. +fn generate_accept_key<'k>(key_base64: &[u8; 24], output_buf: &'k mut [u8; 32]) -> &'k [u8] { + let mut digest = Sha1::new(); + digest.update(key_base64); + digest.update(KEY); + let d = digest.finalize(); + + let n = base64::encode_config_slice(&d, base64::STANDARD, output_buf); + &output_buf[..n] +} + /// Enumeration of possible handshake errors. #[non_exhaustive] #[derive(Debug)] diff --git a/src/handshake/http.rs b/src/handshake/http.rs new file mode 100644 index 00000000..c42d980b --- /dev/null +++ b/src/handshake/http.rs @@ -0,0 +1,114 @@ +// Copyright (c) 2019 Parity Technologies (UK) Ltd. +// +// Licensed under the Apache License, Version 2.0 +// or the MIT +// license , at your +// option. All files in the project carrying such notice may not be copied, +// modified, or distributed except according to those terms. + +//! This module exposes a utility method and a couple of related types to +//! make it easier to upgrade an [`http::Request`] into a Soketto Socket +//! connection. Take a look at the `examples/hyper_server` example to see it +//! in use. + +use super::SEC_WEBSOCKET_EXTENSIONS; +use crate::handshake; +use crate::handshake::server::ServerConfiguration; +use http::{header, HeaderMap, Response}; +use std::convert::TryInto; + +/// An error attempting to upgrade the [`http::Request`] +pub enum UpgradeError { + /// The [`http::Request`] provided wasn't a socket upgrade request. + NotAnUpgradeRequest, + /// A [`handshake::Error`] encountered attempting to upgrade the request. + HandshakeError(handshake::Error), +} + +impl From for UpgradeError { + fn from(e: handshake::Error) -> Self { + UpgradeError::HandshakeError(e) + } +} + +/// This is handed back on a successful call to [`upgrade_request`]. +pub struct Upgraded { + /// This should be handed back to the client to complete the upgrade. + pub response: http::Response<()>, + /// This should be passed to a [`handshake::Server`] once it's been + /// constructed, to configure it according to this request. + pub server_configuration: ServerConfiguration, +} + +/// Upgrade the provided [`http::Request`] to a socket connection. This returns an [`http::Response`] +/// that should be sent back to the client, as well as a [`ServerConfiguration`] struct which can be +/// handed to a Soketto server to configure its extensions/protocols based on this request. +pub fn upgrade_request(req: &http::Request) -> Result { + if !is_upgrade_request(&req) { + return Err(UpgradeError::NotAnUpgradeRequest); + } + + let key = match req.headers().get("Sec-WebSocket-Key") { + Some(key) => key, + None => { + return Err(handshake::Error::HeaderNotFound("Sec-WebSocket-Key".into()).into()); + } + }; + + if req.headers().get("Sec-WebSocket-Version").map(|v| v.as_bytes()) != Some(b"13") { + return Err(handshake::Error::HeaderNotFound("Sec-WebSocket-Version".into()).into()); + } + + // Given the bytes provided in Sec-WebSocket-Key, generate the bytes to return in Sec-WebSocket-Accept: + let mut accept_key_buf = [0; 32]; + let key = match key.as_bytes().try_into() { + Ok(key) => key, + Err(_) => return Err(UpgradeError::HandshakeError(handshake::Error::InvalidSecWebSocketAccept)), + }; + let accept_key = handshake::generate_accept_key(key, &mut accept_key_buf); + + // Get extension information out of the request. + let extension_config = req + .headers() + .iter() + .filter(|&(name, _)| name.as_str().eq_ignore_ascii_case(SEC_WEBSOCKET_EXTENSIONS)) + .map(|(_, value)| Ok(std::str::from_utf8(value.as_bytes())?.to_string())) + .collect::, handshake::Error>>()?; + let server_configuration = ServerConfiguration { extension_config }; + + // Build a response that should be sent back to the client to acknowledge the upgrade. + let response = Response::builder() + .status(http::StatusCode::SWITCHING_PROTOCOLS) + .header(http::header::CONNECTION, "upgrade") + .header(http::header::UPGRADE, "websocket") + .header("Sec-WebSocket-Accept", accept_key) + .body(()) + .expect("bug: failed to build response"); + + Ok(Upgraded { response, server_configuration }) +} + +/// Check if a request looks like a websocket upgrade request. +fn is_upgrade_request(request: &http::Request) -> bool { + header_contains_value(request.headers(), header::CONNECTION, b"upgrade") + && header_contains_value(request.headers(), header::UPGRADE, b"websocket") +} + +/// Check if there is a header of the given name containing the wanted value. +fn header_contains_value(headers: &HeaderMap, header: header::HeaderName, value: &[u8]) -> bool { + pub fn trim(x: &[u8]) -> &[u8] { + let from = match x.iter().position(|x| !x.is_ascii_whitespace()) { + Some(i) => i, + None => return &[], + }; + let to = x.iter().rposition(|x| !x.is_ascii_whitespace()).unwrap(); + &x[from..=to] + } + + for header in headers.get_all(header) { + if header.as_bytes().split(|&c| c == b',').any(|x| trim(x).eq_ignore_ascii_case(value)) { + return true; + } + } + false +} diff --git a/src/handshake/server.rs b/src/handshake/server.rs index a4ba6b13..dfe0837a 100644 --- a/src/handshake/server.rs +++ b/src/handshake/server.rs @@ -11,14 +11,13 @@ //! [handshake]: https://tools.ietf.org/html/rfc6455#section-4 use super::{ - append_extensions, configure_extensions, expect_ascii_header, with_first_header, Error, WebSocketKey, KEY, + append_extensions, configure_extensions, expect_ascii_header, with_first_header, Error, WebSocketKey, MAX_NUM_HEADERS, SEC_WEBSOCKET_EXTENSIONS, SEC_WEBSOCKET_PROTOCOL, }; use crate::connection::{self, Mode}; use crate::extension::Extension; use bytes::BytesMut; use futures::prelude::*; -use sha1::{Digest, Sha1}; use std::{mem, str}; // Most HTTP servers default to 8KB limit on headers @@ -37,6 +36,13 @@ pub struct Server<'a, T> { buffer: BytesMut, } +/// Server configuration can't be generated by hand. Using the `http` feature, this is returned +/// from `crate::handshake::http::upgrade_request`, and can be handed to [`Server::configure`] +/// in order to configure the server instance appropriately based on the request headers. +pub struct ServerConfiguration { + pub(super) extension_config: Vec, +} + impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { /// Create a new server handshake. pub fn new(socket: T) -> Self { @@ -126,6 +132,16 @@ impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { self.socket } + /// Configure the server based on the [`ServerConfiguration`] payload provided. This + /// is handled automatically if using [`Server::receive_request`] to handle the + /// incoming request. + pub fn configure(&mut self, configuration: ServerConfiguration) -> Result<(), Error> { + for config_str in configuration.extension_config { + configure_extensions(&mut self.extensions, &config_str)?; + } + Ok(()) + } + // Decode client handshake request. fn decode_request(&mut self) -> Result { let mut header_buf = [httparse::EMPTY_HEADER; MAX_NUM_HEADERS]; @@ -188,14 +204,7 @@ impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { match response { Response::Accept { key, protocol } => { let mut key_buf = [0; 32]; - let accept_value = { - let mut digest = Sha1::new(); - digest.update(key); - digest.update(KEY); - let d = digest.finalize(); - let n = base64::encode_config_slice(&d, base64::STANDARD, &mut key_buf); - &key_buf[..n] - }; + let accept_value = super::generate_accept_key(&key, &mut key_buf); self.buffer.extend_from_slice( concat![ "HTTP/1.1 101 Switching Protocols", From 6c980721eed8702bee1fc9dfd7f6816691fac44d Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 16 Sep 2021 14:09:07 +0100 Subject: [PATCH 02/13] rethink 1 (but it turns out this won't work..) --- examples/hyper_server.rs | 66 +++++++++++++++++++-------- src/handshake.rs | 21 +++++++++ src/handshake/http.rs | 97 ++++++++++++++++++++++++---------------- src/handshake/server.rs | 48 +++++++++----------- 4 files changed, 147 insertions(+), 85 deletions(-) diff --git a/examples/hyper_server.rs b/examples/hyper_server.rs index af8c47c2..e4951e14 100644 --- a/examples/hyper_server.rs +++ b/examples/hyper_server.rs @@ -26,8 +26,7 @@ use hyper::{Body, Request, Response}; use soketto::{ handshake::{ self, - http::{upgrade_request, UpgradeError, Upgraded}, - server, + http::{negotiate_upgrade, NegotiationError}, }, BoxedError, }; @@ -42,41 +41,70 @@ async fn main() -> Result<(), BoxedError> { hyper::service::make_service_fn(|_| async { Ok::<_, hyper::Error>(hyper::service::service_fn(handler)) }); let server = hyper::Server::bind(&addr).serve(service); - println!("Listening on http://{}", server.local_addr()); + println!("Listening on http://{} — connect and I'll echo back anything you send!", server.local_addr()); server.await?; Ok(()) } +/// Our SokettoServer in this example is basically a hyper `Upgraded` stream with a BufReader and BufWriter wrapped +/// around it for performance. +type SokettoServer<'a> = + handshake::Server<'a, BufReader>>>; + /// Handle incoming HTTP Requests. async fn handler(req: Request) -> Result, BoxedError> { - match upgrade_request(&req) { - // The upgrade was successful, so we return the response and start up a server to take charge of the socket: - Ok(Upgraded { response, server_configuration }) => { + match negotiate_upgrade(&req) { + // The upgrade request was successful, so reply with the appropriate response and start a Soketto server up: + Ok(upgrade_negotiation) => { +// BUG: So, this all looks good in theory, and gives us a way to neatly configure a server based on the incoming request, +// and ensure that the response contains the correct information. BUT! `hyper::upgrade::on` waits until the response is sent +// (I think) until it hands back the stream, so it doesn't actually work! Will need a(nother) rethink.. + + // The negotiation to upgrade to a WebSocket connection has been successful so far. Next, we get back the underlying + // stream using `hyper::upgrade::on`, and hand this to a Soketto server to use to handle the WebSocket communication + // on this socket. + let stream = hyper::upgrade::on(req).await?; + let mut server = handshake::Server::new(BufReader::new(BufWriter::new(stream.compat()))); + + // Now that we have a server, we can add extensions to it if we like. + #[cfg(feature = "deflate")] + { + let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Server); + server.add_extension(Box::new(deflate)); + } + + // Given a configured server, we can now conclude our upgrade negotiation by getting back a response to hand back to the client. + // This may lead to some configuration of enabled extensions based on the incoming request params, and will fail if the configuration + // parameters are invalid for the extionsions in question. + let response = match upgrade_negotiation.into_response(&mut server) { + Ok(res) => res, + Err(_e) => { + println!("aaah {:?}", _e); + return Ok(Response::new(Body::from("Something went wrong upgrading!"))) + }, + }; + + // Spawn off a task to handle the long running socket server that we've established. tokio::spawn(async move { - let on_upgrade = hyper::upgrade::on(req); - if let Err(e) = websocket_echo_messages(on_upgrade, server_configuration).await { + if let Err(e) = websocket_echo_messages(server).await { eprintln!("Error upgrading to websocket connection: {}", e); } }); + + // Return the response ("fixing" the body type, since `negotiate_upgrade` doesn't know about `hyper::Body`). Ok(response.map(|()| Body::empty())) } - // We tried to upgrade and failed; tell the client about the failure however we like: - Err(UpgradeError::HandshakeError(_e)) => Ok(Response::new(Body::from("Something went wrong upgrading!"))), + // We tried to upgrade and failed early on; tell the client about the failure however we like: + Err(NegotiationError::HandshakeError(_e)) => Ok(Response::new(Body::from("Something went wrong upgrading!"))), // The request wasn't an upgrade request; let's treat it as a standard HTTP request: - Err(UpgradeError::NotAnUpgradeRequest) => Ok(Response::new(Body::from("Hello HTTP!"))), + Err(NegotiationError::NotAnUpgradeRequest) => Ok(Response::new(Body::from("Hello HTTP!"))), } } /// Echo any messages we get from the client back to them -async fn websocket_echo_messages( - on_upgrade: hyper::upgrade::OnUpgrade, - server_configuration: server::ServerConfiguration, -) -> Result<(), BoxedError> { - // Wait for the request to upgrade, and pass the stream we get back to Soketto to handle the WS connection: - let stream = on_upgrade.await?; - let mut server = handshake::Server::new(BufReader::new(BufWriter::new(stream.compat()))); - server.configure(server_configuration)?; +async fn websocket_echo_messages(server: SokettoServer<'_>) -> Result<(), BoxedError> { + // Get back a reader and writer that we can use to send and receive websocket messages. let (mut sender, mut receiver) = server.into_builder().finish(); // Echo any received messages back to the client: diff --git a/src/handshake.rs b/src/handshake.rs index 1eee52b1..fefb3e04 100644 --- a/src/handshake.rs +++ b/src/handshake.rs @@ -124,6 +124,27 @@ where } } +// Write the extension header value to the given buffer. +fn append_extension_header_value<'a, I>(mut extensions_iter: std::iter::Peekable, bytes: &mut BytesMut) +where + I: Iterator>, +{ + while let Some(e) = extensions_iter.next() { + bytes.extend_from_slice(e.name().as_bytes()); + for p in e.params() { + bytes.extend_from_slice(b"; "); + bytes.extend_from_slice(p.name().as_bytes()); + if let Some(v) = p.value() { + bytes.extend_from_slice(b"="); + bytes.extend_from_slice(v.as_bytes()) + } + } + if extensions_iter.peek().is_some() { + bytes.extend_from_slice(b", ") + } + } +} + /// This function takes a 16 byte key (base64 encoded, and so 24 bytes of input) that is expected via /// the `Sec-WebSocket-Key` header during a websocket handshake, and a 32 byte output buffer, and /// writes the response that's expected to be handed back in the response header `Sec-WebSocket-Accept`. diff --git a/src/handshake/http.rs b/src/handshake/http.rs index c42d980b..68ed9d54 100644 --- a/src/handshake/http.rs +++ b/src/handshake/http.rs @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Parity Technologies (UK) Ltd. +// Copyright (c) 2021 Parity Technologies (UK) Ltd. // // Licensed under the Apache License, Version 2.0 // or the MIT @@ -6,46 +6,79 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. -//! This module exposes a utility method and a couple of related types to -//! make it easier to upgrade an [`http::Request`] into a Soketto Socket -//! connection. Take a look at the `examples/hyper_server` example to see it -//! in use. +/*! +This module exposes the utility method [`upgrade_request`] to make it easier to upgrade +an [`http::Request`] into a Soketto Socket connection. Take a look at the `examples/hyper_server.rs` +example in the crate repository to see this in action. +*/ -use super::SEC_WEBSOCKET_EXTENSIONS; +use super::{Server, SEC_WEBSOCKET_EXTENSIONS}; use crate::handshake; -use crate::handshake::server::ServerConfiguration; use http::{header, HeaderMap, Response}; use std::convert::TryInto; /// An error attempting to upgrade the [`http::Request`] -pub enum UpgradeError { +pub enum NegotiationError { /// The [`http::Request`] provided wasn't a socket upgrade request. NotAnUpgradeRequest, /// A [`handshake::Error`] encountered attempting to upgrade the request. HandshakeError(handshake::Error), } -impl From for UpgradeError { +impl From for NegotiationError { fn from(e: handshake::Error) -> Self { - UpgradeError::HandshakeError(e) + NegotiationError::HandshakeError(e) } } -/// This is handed back on a successful call to [`upgrade_request`]. -pub struct Upgraded { - /// This should be handed back to the client to complete the upgrade. - pub response: http::Response<()>, - /// This should be passed to a [`handshake::Server`] once it's been - /// constructed, to configure it according to this request. - pub server_configuration: ServerConfiguration, +/// This is handed back on a successful call to [`negotiate_upgrade`]. It has one method, +/// [`Negotiation::into_response`], which can be provided a Soketto server, and hands back +/// a response to send to the client, as well as configuring the server extensions as needed +/// based on the request. +pub struct Negotiation { + key: [u8; 24], + extension_config: Vec, +} + +impl Negotiation { + /// Generate an [`http::Response`] to the negotiation request. This should be + /// returned to the client to complete the upgrade negotiation. + pub fn into_response<'a, T>(self, server: &mut Server<'a, T>) -> Result, handshake::Error> { + // Attempt to set the extension configuration params that the client requested. + for config_str in self.extension_config { + handshake::configure_extensions(server.extensions_mut(), &config_str)?; + } + + let mut accept_key_buf = [0; 32]; + let accept_key = handshake::generate_accept_key(&self.key, &mut accept_key_buf); + + // Build a response that should be sent back to the client to acknowledge the upgrade. + let mut response = Response::builder() + .status(http::StatusCode::SWITCHING_PROTOCOLS) + .header(http::header::CONNECTION, "upgrade") + .header(http::header::UPGRADE, "websocket") + .header("Sec-WebSocket-Accept", accept_key); + + // Tell the client about the agreed-upon extension configuration. We reuse code to build up the + // extension header value, but that does make this a little more clunky. + if !server.extensions_mut().is_empty() { + let mut buf = bytes::BytesMut::new(); + let enabled_extensions = server.extensions_mut().iter().filter(|e| e.is_enabled()).peekable(); + handshake::append_extension_header_value(enabled_extensions, &mut buf); + response = response.header("Sec-WebSocket-Extensions", buf.as_ref()); + } + + let response = response.body(()).expect("bug: failed to build response"); + Ok(response) + } } /// Upgrade the provided [`http::Request`] to a socket connection. This returns an [`http::Response`] -/// that should be sent back to the client, as well as a [`ServerConfiguration`] struct which can be +/// that should be sent back to the client, as well as a [`ExtensionConfiguration`] struct which can be /// handed to a Soketto server to configure its extensions/protocols based on this request. -pub fn upgrade_request(req: &http::Request) -> Result { +pub fn negotiate_upgrade(req: &http::Request) -> Result { if !is_upgrade_request(&req) { - return Err(UpgradeError::NotAnUpgradeRequest); + return Err(NegotiationError::NotAnUpgradeRequest); } let key = match req.headers().get("Sec-WebSocket-Key") { @@ -59,33 +92,21 @@ pub fn upgrade_request(req: &http::Request) -> Result key, - Err(_) => return Err(UpgradeError::HandshakeError(handshake::Error::InvalidSecWebSocketAccept)), + Err(_) => return Err(NegotiationError::HandshakeError(handshake::Error::InvalidSecWebSocketAccept)), }; - let accept_key = handshake::generate_accept_key(key, &mut accept_key_buf); - // Get extension information out of the request. + // Get extension information out of the request as we'll need this as well. let extension_config = req .headers() .iter() .filter(|&(name, _)| name.as_str().eq_ignore_ascii_case(SEC_WEBSOCKET_EXTENSIONS)) .map(|(_, value)| Ok(std::str::from_utf8(value.as_bytes())?.to_string())) .collect::, handshake::Error>>()?; - let server_configuration = ServerConfiguration { extension_config }; - - // Build a response that should be sent back to the client to acknowledge the upgrade. - let response = Response::builder() - .status(http::StatusCode::SWITCHING_PROTOCOLS) - .header(http::header::CONNECTION, "upgrade") - .header(http::header::UPGRADE, "websocket") - .header("Sec-WebSocket-Accept", accept_key) - .body(()) - .expect("bug: failed to build response"); - - Ok(Upgraded { response, server_configuration }) + + Ok(Negotiation { key, extension_config }) } /// Check if a request looks like a websocket upgrade request. diff --git a/src/handshake/server.rs b/src/handshake/server.rs index dfe0837a..23ec1299 100644 --- a/src/handshake/server.rs +++ b/src/handshake/server.rs @@ -36,19 +36,7 @@ pub struct Server<'a, T> { buffer: BytesMut, } -/// Server configuration can't be generated by hand. Using the `http` feature, this is returned -/// from `crate::handshake::http::upgrade_request`, and can be handed to [`Server::configure`] -/// in order to configure the server instance appropriately based on the request headers. -pub struct ServerConfiguration { - pub(super) extension_config: Vec, -} - -impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { - /// Create a new server handshake. - pub fn new(socket: T) -> Self { - Server { socket, protocols: Vec::new(), extensions: Vec::new(), buffer: BytesMut::new() } - } - +impl<'a, T> Server<'a, T> { /// Override the buffer to use for request/response handling. pub fn set_buffer(&mut self, b: BytesMut) -> &mut Self { self.buffer = b; @@ -77,6 +65,25 @@ impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { self.extensions.drain(..) } + /// Get out the inner socket of the server. + pub fn into_inner(self) -> T { + self.socket + } + + /// This is not exposed publically, but is useful to allow optional features + /// access to the extensions to configure them. + #[cfg(feature = "http")] + pub(crate) fn extensions_mut(&mut self) -> &mut Vec> { + &mut self.extensions + } +} + +impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { + /// Create a new server handshake. + pub fn new(socket: T) -> Self { + Server { socket, protocols: Vec::new(), extensions: Vec::new(), buffer: BytesMut::new() } + } + /// Await an incoming client handshake request. pub async fn receive_request(&mut self) -> Result, Error> { self.buffer.clear(); @@ -127,21 +134,6 @@ impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { builder } - /// Get out the inner socket of the server. - pub fn into_inner(self) -> T { - self.socket - } - - /// Configure the server based on the [`ServerConfiguration`] payload provided. This - /// is handled automatically if using [`Server::receive_request`] to handle the - /// incoming request. - pub fn configure(&mut self, configuration: ServerConfiguration) -> Result<(), Error> { - for config_str in configuration.extension_config { - configure_extensions(&mut self.extensions, &config_str)?; - } - Ok(()) - } - // Decode client handshake request. fn decode_request(&mut self) -> Result { let mut header_buf = [httparse::EMPTY_HEADER; MAX_NUM_HEADERS]; From 8695672ab4496d894a1b531c993724d313419b65 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 16 Sep 2021 15:10:31 +0100 Subject: [PATCH 03/13] Make a new Server interface instead so that we have an API we can use --- Cargo.toml | 2 +- examples/hyper_server.rs | 89 +++++++++------------ src/handshake/http.rs | 164 ++++++++++++++++++++++----------------- src/handshake/server.rs | 9 +-- 4 files changed, 133 insertions(+), 131 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b8bab2c3..5ddd9aa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ edition = "2018" all-features = true [features] -default = [] +default = ["http"] # todo remove deflate = ["flate2"] [dependencies] diff --git a/examples/hyper_server.rs b/examples/hyper_server.rs index e4951e14..55470e22 100644 --- a/examples/hyper_server.rs +++ b/examples/hyper_server.rs @@ -24,10 +24,7 @@ use futures::io::{BufReader, BufWriter}; use hyper::{Body, Request, Response}; use soketto::{ - handshake::{ - self, - http::{negotiate_upgrade, NegotiationError}, - }, + handshake::http::{is_upgrade_request, Server}, BoxedError, }; use tokio_util::compat::TokioAsyncReadCompatExt; @@ -47,65 +44,53 @@ async fn main() -> Result<(), BoxedError> { Ok(()) } -/// Our SokettoServer in this example is basically a hyper `Upgraded` stream with a BufReader and BufWriter wrapped -/// around it for performance. -type SokettoServer<'a> = - handshake::Server<'a, BufReader>>>; - /// Handle incoming HTTP Requests. async fn handler(req: Request) -> Result, BoxedError> { - match negotiate_upgrade(&req) { - // The upgrade request was successful, so reply with the appropriate response and start a Soketto server up: - Ok(upgrade_negotiation) => { -// BUG: So, this all looks good in theory, and gives us a way to neatly configure a server based on the incoming request, -// and ensure that the response contains the correct information. BUT! `hyper::upgrade::on` waits until the response is sent -// (I think) until it hands back the stream, so it doesn't actually work! Will need a(nother) rethink.. - - // The negotiation to upgrade to a WebSocket connection has been successful so far. Next, we get back the underlying - // stream using `hyper::upgrade::on`, and hand this to a Soketto server to use to handle the WebSocket communication - // on this socket. - let stream = hyper::upgrade::on(req).await?; - let mut server = handshake::Server::new(BufReader::new(BufWriter::new(stream.compat()))); + if is_upgrade_request(&req) { + // Create a new handshake server. + let mut server = Server::new(); + + // Add any extensions that we want to use. + #[cfg(feature = "deflate")] + { + let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Server); + server.add_extension(Box::new(deflate)); + } - // Now that we have a server, we can add extensions to it if we like. - #[cfg(feature = "deflate")] - { - let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Server); - server.add_extension(Box::new(deflate)); + // Attempt begin the handshake. + match server.receive_request(&req) { + // The handshake has been successful so far; return the response we're given back + // and spawn a task to handle the long-running WebSocket server: + Ok(response) => { + tokio::spawn(async move { + if let Err(e) = websocket_echo_messages(server, req).await { + eprintln!("Error upgrading to websocket connection: {}", e); + } + }); + Ok(response.map(|()| Body::empty())) } - - // Given a configured server, we can now conclude our upgrade negotiation by getting back a response to hand back to the client. - // This may lead to some configuration of enabled extensions based on the incoming request params, and will fail if the configuration - // parameters are invalid for the extionsions in question. - let response = match upgrade_negotiation.into_response(&mut server) { - Ok(res) => res, - Err(_e) => { - println!("aaah {:?}", _e); - return Ok(Response::new(Body::from("Something went wrong upgrading!"))) - }, - }; - - // Spawn off a task to handle the long running socket server that we've established. - tokio::spawn(async move { - if let Err(e) = websocket_echo_messages(server).await { - eprintln!("Error upgrading to websocket connection: {}", e); - } - }); - - // Return the response ("fixing" the body type, since `negotiate_upgrade` doesn't know about `hyper::Body`). - Ok(response.map(|()| Body::empty())) + // We tried to upgrade and failed early on; tell the client about the failure however we like: + Err(_e) => Ok(Response::new(Body::from("Something went wrong upgrading!"))), } - // We tried to upgrade and failed early on; tell the client about the failure however we like: - Err(NegotiationError::HandshakeError(_e)) => Ok(Response::new(Body::from("Something went wrong upgrading!"))), + } else { // The request wasn't an upgrade request; let's treat it as a standard HTTP request: - Err(NegotiationError::NotAnUpgradeRequest) => Ok(Response::new(Body::from("Hello HTTP!"))), + Ok(Response::new(Body::from("Hello HTTP!"))) } } /// Echo any messages we get from the client back to them -async fn websocket_echo_messages(server: SokettoServer<'_>) -> Result<(), BoxedError> { +async fn websocket_echo_messages(server: Server, req: Request) -> Result<(), BoxedError> { + // The negotiation to upgrade to a WebSocket connection has been successful so far. Next, we get back the underlying + // stream using `hyper::upgrade::on`, and hand this to a Soketto server to use to handle the WebSocket communication + // on this socket. + // + // Note: awaiting this won't succeed until the handshake response has been returned to the client, so this must be + // spawned on a separate task so as not to block that response being handed back. + let stream = hyper::upgrade::on(req).await?; + let stream = BufReader::new(BufWriter::new(stream.compat())); + // Get back a reader and writer that we can use to send and receive websocket messages. - let (mut sender, mut receiver) = server.into_builder().finish(); + let (mut sender, mut receiver) = server.into_builder(stream).finish(); // Echo any received messages back to the client: let mut message = Vec::new(); diff --git a/src/handshake/http.rs b/src/handshake/http.rs index 68ed9d54..a41c4a11 100644 --- a/src/handshake/http.rs +++ b/src/handshake/http.rs @@ -7,50 +7,102 @@ // modified, or distributed except according to those terms. /*! -This module exposes the utility method [`upgrade_request`] to make it easier to upgrade -an [`http::Request`] into a Soketto Socket connection. Take a look at the `examples/hyper_server.rs` -example in the crate repository to see this in action. +This module somewhat mirrors [`crate::handshake::server`], expect it's focus is on handling +externally provided [`http`] types, making it easier to integrate with external web servers +such as [`hyper`]. + +See `examples/hyper_server.rs` from this crate's repository for example usage. */ -use super::{Server, SEC_WEBSOCKET_EXTENSIONS}; +use super::SEC_WEBSOCKET_EXTENSIONS; +use crate::connection::{self, Mode}; +use crate::extension::Extension; use crate::handshake; +use bytes::BytesMut; +use futures::prelude::*; use http::{header, HeaderMap, Response}; use std::convert::TryInto; - -/// An error attempting to upgrade the [`http::Request`] -pub enum NegotiationError { - /// The [`http::Request`] provided wasn't a socket upgrade request. - NotAnUpgradeRequest, - /// A [`handshake::Error`] encountered attempting to upgrade the request. - HandshakeError(handshake::Error), +use std::mem; + +/// A re-export of [`Error`]. +pub type Error = handshake::Error; + +/// Websocket handshake server. This is similar to [`handshake::Server`], but it is +/// focused on performing the WebSocket handshake using a provided [`http::Request`], as opposed +/// to decoding the request internally. +pub struct Server { + /// Extensions the server supports. + extensions: Vec>, + /// Encoding/decoding buffer. + buffer: BytesMut, } -impl From for NegotiationError { - fn from(e: handshake::Error) -> Self { - NegotiationError::HandshakeError(e) +impl Server { + /// Create a new server handshake. + pub fn new() -> Self { + Server { extensions: Vec::new(), buffer: BytesMut::new() } } -} -/// This is handed back on a successful call to [`negotiate_upgrade`]. It has one method, -/// [`Negotiation::into_response`], which can be provided a Soketto server, and hands back -/// a response to send to the client, as well as configuring the server extensions as needed -/// based on the request. -pub struct Negotiation { - key: [u8; 24], - extension_config: Vec, -} + /// Override the buffer to use for request/response handling. + pub fn set_buffer(&mut self, b: BytesMut) -> &mut Self { + self.buffer = b; + self + } -impl Negotiation { - /// Generate an [`http::Response`] to the negotiation request. This should be - /// returned to the client to complete the upgrade negotiation. - pub fn into_response<'a, T>(self, server: &mut Server<'a, T>) -> Result, handshake::Error> { - // Attempt to set the extension configuration params that the client requested. - for config_str in self.extension_config { - handshake::configure_extensions(server.extensions_mut(), &config_str)?; + /// Extract the buffer. + pub fn take_buffer(&mut self) -> BytesMut { + mem::take(&mut self.buffer) + } + + /// Add an extension the server supports. + pub fn add_extension(&mut self, e: Box) -> &mut Self { + self.extensions.push(e); + self + } + + /// Get back all extensions. + pub fn drain_extensions(&mut self) -> impl Iterator> + '_ { + self.extensions.drain(..) + } + + /// Attempt to interpret the provided [`http::Request`] as a WebSocket Upgrade request. If successful, this + /// returns an [`http::Response`] that should be returned to the client to complete the handshake. + pub fn receive_request(&mut self, req: &http::Request) -> Result, Error> { + if !is_upgrade_request(&req) { + return Err(Error::InvalidSecWebSocketAccept); } + let key = match req.headers().get("Sec-WebSocket-Key") { + Some(key) => key, + None => { + return Err(Error::HeaderNotFound("Sec-WebSocket-Key".into()).into()); + } + }; + + if req.headers().get("Sec-WebSocket-Version").map(|v| v.as_bytes()) != Some(b"13") { + return Err(Error::HeaderNotFound("Sec-WebSocket-Version".into()).into()); + } + + // Pull out the Sec-WebSocket-Key and generate the appropriate response to it. + let key: &[u8; 24] = match key.as_bytes().try_into() { + Ok(key) => key, + Err(_) => return Err(Error::InvalidSecWebSocketAccept), + }; let mut accept_key_buf = [0; 32]; - let accept_key = handshake::generate_accept_key(&self.key, &mut accept_key_buf); + let accept_key = handshake::generate_accept_key(key, &mut accept_key_buf); + + // Get extension information out of the request as we'll need this as well. + let extension_config = req + .headers() + .iter() + .filter(|&(name, _)| name.as_str().eq_ignore_ascii_case(SEC_WEBSOCKET_EXTENSIONS)) + .map(|(_, value)| Ok(std::str::from_utf8(value.as_bytes())?.to_string())) + .collect::, Error>>()?; + + // Attempt to set the extension configuration params that the client requested. + for config_str in &extension_config { + handshake::configure_extensions(&mut self.extensions, &config_str)?; + } // Build a response that should be sent back to the client to acknowledge the upgrade. let mut response = Response::builder() @@ -61,9 +113,9 @@ impl Negotiation { // Tell the client about the agreed-upon extension configuration. We reuse code to build up the // extension header value, but that does make this a little more clunky. - if !server.extensions_mut().is_empty() { + if !self.extensions.is_empty() { let mut buf = bytes::BytesMut::new(); - let enabled_extensions = server.extensions_mut().iter().filter(|e| e.is_enabled()).peekable(); + let enabled_extensions = self.extensions.iter().filter(|e| e.is_enabled()).peekable(); handshake::append_extension_header_value(enabled_extensions, &mut buf); response = response.header("Sec-WebSocket-Extensions", buf.as_ref()); } @@ -71,51 +123,23 @@ impl Negotiation { let response = response.body(()).expect("bug: failed to build response"); Ok(response) } -} - -/// Upgrade the provided [`http::Request`] to a socket connection. This returns an [`http::Response`] -/// that should be sent back to the client, as well as a [`ExtensionConfiguration`] struct which can be -/// handed to a Soketto server to configure its extensions/protocols based on this request. -pub fn negotiate_upgrade(req: &http::Request) -> Result { - if !is_upgrade_request(&req) { - return Err(NegotiationError::NotAnUpgradeRequest); - } - let key = match req.headers().get("Sec-WebSocket-Key") { - Some(key) => key, - None => { - return Err(handshake::Error::HeaderNotFound("Sec-WebSocket-Key".into()).into()); - } - }; - - if req.headers().get("Sec-WebSocket-Version").map(|v| v.as_bytes()) != Some(b"13") { - return Err(handshake::Error::HeaderNotFound("Sec-WebSocket-Version".into()).into()); + /// Turn this handshake into a [`connection::Builder`]. + pub fn into_builder(mut self, socket: T) -> connection::Builder { + let mut builder = connection::Builder::new(socket, Mode::Server); + builder.set_buffer(self.buffer); + builder.add_extensions(self.extensions.drain(..)); + builder } - - // Pull out the Sec-WebSocket-Key; we'll need this for our response. - let key: [u8; 24] = match key.as_bytes().try_into() { - Ok(key) => key, - Err(_) => return Err(NegotiationError::HandshakeError(handshake::Error::InvalidSecWebSocketAccept)), - }; - - // Get extension information out of the request as we'll need this as well. - let extension_config = req - .headers() - .iter() - .filter(|&(name, _)| name.as_str().eq_ignore_ascii_case(SEC_WEBSOCKET_EXTENSIONS)) - .map(|(_, value)| Ok(std::str::from_utf8(value.as_bytes())?.to_string())) - .collect::, handshake::Error>>()?; - - Ok(Negotiation { key, extension_config }) } -/// Check if a request looks like a websocket upgrade request. -fn is_upgrade_request(request: &http::Request) -> bool { +// Check if a request looks like a valid websocket upgrade request. +pub fn is_upgrade_request(request: &http::Request) -> bool { header_contains_value(request.headers(), header::CONNECTION, b"upgrade") && header_contains_value(request.headers(), header::UPGRADE, b"websocket") } -/// Check if there is a header of the given name containing the wanted value. +// Check if there is a header of the given name containing the wanted value. fn header_contains_value(headers: &HeaderMap, header: header::HeaderName, value: &[u8]) -> bool { pub fn trim(x: &[u8]) -> &[u8] { let from = match x.iter().position(|x| !x.is_ascii_whitespace()) { diff --git a/src/handshake/server.rs b/src/handshake/server.rs index 23ec1299..f1fcf5eb 100644 --- a/src/handshake/server.rs +++ b/src/handshake/server.rs @@ -24,7 +24,7 @@ use std::{mem, str}; const MAX_HEADERS_SIZE: usize = 8 * 1024; const BLOCK_SIZE: usize = 8 * 1024; -/// Websocket handshake client. +/// Websocket handshake server. #[derive(Debug)] pub struct Server<'a, T> { socket: T, @@ -69,13 +69,6 @@ impl<'a, T> Server<'a, T> { pub fn into_inner(self) -> T { self.socket } - - /// This is not exposed publically, but is useful to allow optional features - /// access to the extensions to configure them. - #[cfg(feature = "http")] - pub(crate) fn extensions_mut(&mut self) -> &mut Vec> { - &mut self.extensions - } } impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { From 17987b76c30576079907405c41715623b3186bad Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 16 Sep 2021 15:16:37 +0100 Subject: [PATCH 04/13] remove the default_feature --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 5ddd9aa0..b8bab2c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ edition = "2018" all-features = true [features] -default = ["http"] # todo remove +default = [] deflate = ["flate2"] [dependencies] From a0593b2c272d2351838c04a83cd007ab10c13aed Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 16 Sep 2021 15:22:17 +0100 Subject: [PATCH 05/13] dedupe code in handshake and undo changes in handshake::server to minimise diff --- src/handshake.rs | 15 +-------------- src/handshake/server.rs | 24 +++++++++++------------- 2 files changed, 12 insertions(+), 27 deletions(-) diff --git a/src/handshake.rs b/src/handshake.rs index fefb3e04..4894878d 100644 --- a/src/handshake.rs +++ b/src/handshake.rs @@ -108,20 +108,7 @@ where bytes.extend_from_slice(b"\r\nSec-WebSocket-Extensions: ") } - while let Some(e) = iter.next() { - bytes.extend_from_slice(e.name().as_bytes()); - for p in e.params() { - bytes.extend_from_slice(b"; "); - bytes.extend_from_slice(p.name().as_bytes()); - if let Some(v) = p.value() { - bytes.extend_from_slice(b"="); - bytes.extend_from_slice(v.as_bytes()) - } - } - if iter.peek().is_some() { - bytes.extend_from_slice(b", ") - } - } + append_extension_header_value(iter, bytes) } // Write the extension header value to the given buffer. diff --git a/src/handshake/server.rs b/src/handshake/server.rs index f1fcf5eb..648d6820 100644 --- a/src/handshake/server.rs +++ b/src/handshake/server.rs @@ -36,7 +36,12 @@ pub struct Server<'a, T> { buffer: BytesMut, } -impl<'a, T> Server<'a, T> { +impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { + /// Create a new server handshake. + pub fn new(socket: T) -> Self { + Server { socket, protocols: Vec::new(), extensions: Vec::new(), buffer: BytesMut::new() } + } + /// Override the buffer to use for request/response handling. pub fn set_buffer(&mut self, b: BytesMut) -> &mut Self { self.buffer = b; @@ -65,18 +70,6 @@ impl<'a, T> Server<'a, T> { self.extensions.drain(..) } - /// Get out the inner socket of the server. - pub fn into_inner(self) -> T { - self.socket - } -} - -impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { - /// Create a new server handshake. - pub fn new(socket: T) -> Self { - Server { socket, protocols: Vec::new(), extensions: Vec::new(), buffer: BytesMut::new() } - } - /// Await an incoming client handshake request. pub async fn receive_request(&mut self) -> Result, Error> { self.buffer.clear(); @@ -127,6 +120,11 @@ impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { builder } + /// Get out the inner socket of the server. + pub fn into_inner(self) -> T { + self.socket + } + // Decode client handshake request. fn decode_request(&mut self) -> Result { let mut header_buf = [httparse::EMPTY_HEADER; MAX_NUM_HEADERS]; From 41d76786f7794cff20a059b60e7782cfc339dade Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 16 Sep 2021 15:36:17 +0100 Subject: [PATCH 06/13] Add link to RFC explaining Sec-Websocket-Key --- src/handshake.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/handshake.rs b/src/handshake.rs index 4894878d..d596f687 100644 --- a/src/handshake.rs +++ b/src/handshake.rs @@ -132,9 +132,11 @@ where } } -/// This function takes a 16 byte key (base64 encoded, and so 24 bytes of input) that is expected via -/// the `Sec-WebSocket-Key` header during a websocket handshake, and a 32 byte output buffer, and -/// writes the response that's expected to be handed back in the response header `Sec-WebSocket-Accept`. +// This function takes a 16 byte key (base64 encoded, and so 24 bytes of input) that is expected via +// the `Sec-WebSocket-Key` header during a websocket handshake, and a 32 byte output buffer, and +// writes the response that's expected to be handed back in the response header `Sec-WebSocket-Accept`. +// +// See https://datatracker.ietf.org/doc/html/rfc6455#section-1.3 for more information on this. fn generate_accept_key<'k>(key_base64: &[u8; 24], output_buf: &'k mut [u8; 32]) -> &'k [u8] { let mut digest = Sha1::new(); digest.update(key_base64); From 54ea9ce055df4f47db2457a6b8ee1af8b146805b Mon Sep 17 00:00:00 2001 From: James Wilson Date: Fri, 17 Sep 2021 16:48:23 +0100 Subject: [PATCH 07/13] add basic logger to example so we can get log output for debugging things --- Cargo.toml | 1 + examples/hyper_server.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index b8bab2c3..37216874 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.6", features = ["compat"] } tokio-stream = { version = "0.1", features = ["net"] } hyper = { version = "0.14.10", features = ["full"] } +env_logger = "0.9.0" [[example]] name = "hyper_server" diff --git a/examples/hyper_server.rs b/examples/hyper_server.rs index 55470e22..4fab2e67 100644 --- a/examples/hyper_server.rs +++ b/examples/hyper_server.rs @@ -32,6 +32,8 @@ use tokio_util::compat::TokioAsyncReadCompatExt; /// Start up a hyper server. #[tokio::main] async fn main() -> Result<(), BoxedError> { + env_logger::init(); + let addr = ([127, 0, 0, 1], 3000).into(); let service = From 5e80e217ae4eaef32efa7f4a5c9bcd284b1b8089 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Fri, 17 Sep 2021 17:18:42 +0100 Subject: [PATCH 08/13] Fix up a couple of comments --- src/handshake/http.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/handshake/http.rs b/src/handshake/http.rs index a41c4a11..7ff8384c 100644 --- a/src/handshake/http.rs +++ b/src/handshake/http.rs @@ -8,8 +8,8 @@ /*! This module somewhat mirrors [`crate::handshake::server`], expect it's focus is on handling -externally provided [`http`] types, making it easier to integrate with external web servers -such as [`hyper`]. +an externally provided [`http::Request`] type, making it easier to integrate with external web +servers such as [`hyper`]. See `examples/hyper_server.rs` from this crate's repository for example usage. */ @@ -24,16 +24,16 @@ use http::{header, HeaderMap, Response}; use std::convert::TryInto; use std::mem; -/// A re-export of [`Error`]. +/// A re-export of [`handshake::Error`]. pub type Error = handshake::Error; /// Websocket handshake server. This is similar to [`handshake::Server`], but it is /// focused on performing the WebSocket handshake using a provided [`http::Request`], as opposed /// to decoding the request internally. pub struct Server { - /// Extensions the server supports. + // Extensions the server supports. extensions: Vec>, - /// Encoding/decoding buffer. + // Encoding/decoding buffer. buffer: BytesMut, } @@ -133,7 +133,7 @@ impl Server { } } -// Check if a request looks like a valid websocket upgrade request. +/// Check if an [`http::Request`] looks like a valid websocket upgrade request. pub fn is_upgrade_request(request: &http::Request) -> bool { header_contains_value(request.headers(), header::CONNECTION, b"upgrade") && header_contains_value(request.headers(), header::UPGRADE, b"websocket") From e606700bd9e72e472bee84745193c7946b419e84 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Fri, 17 Sep 2021 17:21:34 +0100 Subject: [PATCH 09/13] use log since we added a logger, and comment tweaks --- examples/hyper_server.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/examples/hyper_server.rs b/examples/hyper_server.rs index 4fab2e67..9190da34 100644 --- a/examples/hyper_server.rs +++ b/examples/hyper_server.rs @@ -59,20 +59,23 @@ async fn handler(req: Request) -> Result, BoxedError server.add_extension(Box::new(deflate)); } - // Attempt begin the handshake. + // Attempt the handshake. match server.receive_request(&req) { // The handshake has been successful so far; return the response we're given back // and spawn a task to handle the long-running WebSocket server: Ok(response) => { tokio::spawn(async move { if let Err(e) = websocket_echo_messages(server, req).await { - eprintln!("Error upgrading to websocket connection: {}", e); + log::error!("Error upgrading to websocket connection: {}", e); } }); Ok(response.map(|()| Body::empty())) } // We tried to upgrade and failed early on; tell the client about the failure however we like: - Err(_e) => Ok(Response::new(Body::from("Something went wrong upgrading!"))), + Err(e) => { + log::error!("Could not upgrade connection: {}", e); + Ok(Response::new(Body::from("Something went wrong upgrading!"))) + } } } else { // The request wasn't an upgrade request; let's treat it as a standard HTTP request: From c851479ae1c7dadf0dbafe22324438f69e74e31f Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 21 Sep 2021 14:04:32 +0100 Subject: [PATCH 10/13] use WebSocketKey alias --- src/handshake.rs | 2 +- src/handshake/http.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/handshake.rs b/src/handshake.rs index d596f687..20413634 100644 --- a/src/handshake.rs +++ b/src/handshake.rs @@ -137,7 +137,7 @@ where // writes the response that's expected to be handed back in the response header `Sec-WebSocket-Accept`. // // See https://datatracker.ietf.org/doc/html/rfc6455#section-1.3 for more information on this. -fn generate_accept_key<'k>(key_base64: &[u8; 24], output_buf: &'k mut [u8; 32]) -> &'k [u8] { +fn generate_accept_key<'k>(key_base64: &WebSocketKey, output_buf: &'k mut [u8; 32]) -> &'k [u8] { let mut digest = Sha1::new(); digest.update(key_base64); digest.update(KEY); diff --git a/src/handshake/http.rs b/src/handshake/http.rs index 7ff8384c..04a4029c 100644 --- a/src/handshake/http.rs +++ b/src/handshake/http.rs @@ -14,7 +14,7 @@ servers such as [`hyper`]. See `examples/hyper_server.rs` from this crate's repository for example usage. */ -use super::SEC_WEBSOCKET_EXTENSIONS; +use super::{WebSocketKey, SEC_WEBSOCKET_EXTENSIONS}; use crate::connection::{self, Mode}; use crate::extension::Extension; use crate::handshake; @@ -84,7 +84,7 @@ impl Server { } // Pull out the Sec-WebSocket-Key and generate the appropriate response to it. - let key: &[u8; 24] = match key.as_bytes().try_into() { + let key: &WebSocketKey = match key.as_bytes().try_into() { Ok(key) => key, Err(_) => return Err(Error::InvalidSecWebSocketAccept), }; From 719a7822867bdb9ca51fb5d56c657ee28f28446a Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 21 Sep 2021 14:21:50 +0100 Subject: [PATCH 11/13] return 28 byte base64 encoding instead of passing buffer in --- src/handshake.rs | 16 +++++++++++----- src/handshake/http.rs | 5 ++--- src/handshake/server.rs | 5 ++--- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/handshake.rs b/src/handshake.rs index 20413634..f9d9415c 100644 --- a/src/handshake.rs +++ b/src/handshake.rs @@ -133,18 +133,24 @@ where } // This function takes a 16 byte key (base64 encoded, and so 24 bytes of input) that is expected via -// the `Sec-WebSocket-Key` header during a websocket handshake, and a 32 byte output buffer, and -// writes the response that's expected to be handed back in the response header `Sec-WebSocket-Accept`. +// the `Sec-WebSocket-Key` header during a websocket handshake, and writes the response that's expected +// to be handed back in the response header `Sec-WebSocket-Accept`. +// +// The response is a base64 encoding of a 160bit hash. base64 encoding uses 1 ascii character per 6 bits, +// and 160 / 6 = 26.66 characters. The output is padded with '=' to the nearest 4 characters, so we need 28 +// bytes in total for all of the characters. // // See https://datatracker.ietf.org/doc/html/rfc6455#section-1.3 for more information on this. -fn generate_accept_key<'k>(key_base64: &WebSocketKey, output_buf: &'k mut [u8; 32]) -> &'k [u8] { +fn generate_accept_key<'k>(key_base64: &WebSocketKey) -> [u8; 28] { let mut digest = Sha1::new(); digest.update(key_base64); digest.update(KEY); let d = digest.finalize(); - let n = base64::encode_config_slice(&d, base64::STANDARD, output_buf); - &output_buf[..n] + let mut output_buf = [0; 28]; + let n = base64::encode_config_slice(&d, base64::STANDARD, &mut output_buf); + debug_assert_eq!(n, 28, "encoding to base64 should be exactly 28 bytes"); + output_buf } /// Enumeration of possible handshake errors. diff --git a/src/handshake/http.rs b/src/handshake/http.rs index 04a4029c..1aec2521 100644 --- a/src/handshake/http.rs +++ b/src/handshake/http.rs @@ -88,8 +88,7 @@ impl Server { Ok(key) => key, Err(_) => return Err(Error::InvalidSecWebSocketAccept), }; - let mut accept_key_buf = [0; 32]; - let accept_key = handshake::generate_accept_key(key, &mut accept_key_buf); + let accept_key = handshake::generate_accept_key(key); // Get extension information out of the request as we'll need this as well. let extension_config = req @@ -109,7 +108,7 @@ impl Server { .status(http::StatusCode::SWITCHING_PROTOCOLS) .header(http::header::CONNECTION, "upgrade") .header(http::header::UPGRADE, "websocket") - .header("Sec-WebSocket-Accept", accept_key); + .header("Sec-WebSocket-Accept", &accept_key[..]); // Tell the client about the agreed-upon extension configuration. We reuse code to build up the // extension header value, but that does make this a little more clunky. diff --git a/src/handshake/server.rs b/src/handshake/server.rs index 648d6820..99fd5f29 100644 --- a/src/handshake/server.rs +++ b/src/handshake/server.rs @@ -186,8 +186,7 @@ impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { fn encode_response(&mut self, response: &Response<'_>) { match response { Response::Accept { key, protocol } => { - let mut key_buf = [0; 32]; - let accept_value = super::generate_accept_key(&key, &mut key_buf); + let accept_value = super::generate_accept_key(&key); self.buffer.extend_from_slice( concat![ "HTTP/1.1 101 Switching Protocols", @@ -199,7 +198,7 @@ impl<'a, T: AsyncRead + AsyncWrite + Unpin> Server<'a, T> { ] .as_bytes(), ); - self.buffer.extend_from_slice(accept_value); + self.buffer.extend_from_slice(&accept_value); if let Some(p) = protocol { self.buffer.extend_from_slice(b"\r\nSec-WebSocket-Protocol: "); self.buffer.extend_from_slice(p.as_bytes()) From 57dc84c6be7a6d94b5d5c57220156bc30c659a6f Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 21 Sep 2021 14:56:00 +0100 Subject: [PATCH 12/13] Tweak the docs a little --- src/handshake/http.rs | 6 +++--- src/lib.rs | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/handshake/http.rs b/src/handshake/http.rs index 1aec2521..7f431626 100644 --- a/src/handshake/http.rs +++ b/src/handshake/http.rs @@ -7,9 +7,9 @@ // modified, or distributed except according to those terms. /*! -This module somewhat mirrors [`crate::handshake::server`], expect it's focus is on handling -an externally provided [`http::Request`] type, making it easier to integrate with external web -servers such as [`hyper`]. +This module somewhat mirrors [`crate::handshake::server`], expect it's focus is on working +with the [`http::Request`] and [`http::Response`] types, making it easier to integrate with +external web servers such as Hyper. See `examples/hyper_server.rs` from this crate's repository for example usage. */ diff --git a/src/lib.rs b/src/lib.rs index e1665b1c..1408742c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -102,6 +102,10 @@ //! # } //! //! ``` +//! +//! See `examples/hyper_server.rs` from this crate's repository for an example of +//! starting up a WebSocket server alongside an Hyper HTTP server. +//! //! [client]: handshake::Client //! [server]: handshake::Server //! [Sender]: connection::Sender From 555ac9e83ea4365bf5e97347a4568177d51e2132 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 21 Sep 2021 16:37:27 +0100 Subject: [PATCH 13/13] fix a typo --- src/handshake/http.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/handshake/http.rs b/src/handshake/http.rs index 7f431626..c429019d 100644 --- a/src/handshake/http.rs +++ b/src/handshake/http.rs @@ -7,8 +7,8 @@ // modified, or distributed except according to those terms. /*! -This module somewhat mirrors [`crate::handshake::server`], expect it's focus is on working -with the [`http::Request`] and [`http::Response`] types, making it easier to integrate with +This module somewhat mirrors [`crate::handshake::server`], except it's focus is on working +with [`http::Request`] and [`http::Response`] types, making it easier to integrate with external web servers such as Hyper. See `examples/hyper_server.rs` from this crate's repository for example usage.