Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

websockets and upgrade mechanism #2466

Closed
wants to merge 7 commits into from
Closed

Conversation

Mai-Lapyst
Copy link
Contributor

This PR implements websockets in rocket via a new API that exposes the hyper upgrade mechanism to applications using rocket. Potentially closes #90.

This is an WIP PR so people can start testing it. If it's needed I can split this PR into one for the upgrade mechanism and one for the websocket support.

  • Implementing an API to upgrade any connection from http to something custom via the rocket::upgrade::Upgrade trait.
  • Implementing an websocket integration via the new upgrade API.
  • Support for sending / recieving websocket messages

Roadmap / things left to do:

  • Maybe a better way to send messages via a dedicated IntoMessage trait
  • Handling of websocket extensions
  • Implementing the common extension permessage-deflate
  • Handling of websocket sub-protocols
  • Examples and tests

Upgrade API

The upgrade api works by defining an custom upgrade handler and giving it to an response:

struct TestSocket {}

#[crate::async_trait]
impl Upgrade<'static> for TestSocket {
  async fn start(&self, upgraded: crate::http::hyper::upgrade::Upgraded) {
    // can fully use the hyper::upgrade::Upgraded struct
  }
}

impl<'r, F> Responder<'r, 'r> for TestSocket {
  fn respond_to(self, req: &'r Request<'_>) -> response::Result<'r> {
    Response::build()
      .status(Status::SwitchingProtocols)
      .raw_header("Connection", "upgrade")
      .raw_header("Upgrade", "testsocket")
      .upgrade(Some(Box::new(self)))
      .ok()
  }
}

Websocket API

The websocket API is modeled loosely after this comment #90 (comment)

use rocket::response::websocket::{WebsocketMessage, WebsocketChannel, CreateWebsocket, Websocket};

#[get("/ws")]
fn websock() -> Websocket![] {
  CreateWebsocket! {
    while let Some(msg) = ch.next().await {
      println!("handler {msg:?}");
      ch.send(msg);
    }
  }
}

@tguichaoua
Copy link

The usage of CreateWebsocket is pretty unclear: nothing indicates there is a variable ch, its name isn't relevant and may shadow another variable.
I suggest to use the closure's syntax: user can choose the ident they want for the argument and whether or not to use move. Even if move is probably required because of the lifetime requirements for the async task, at least it make it explicit for the reader.

CreateWebsocket! {
    move |channel| {
        while let Some(msg) = channel.next().await {
            println!("handler {msg:?}");
            channel.send(msg);
        }
    }
}

@Mai-Lapyst
Copy link
Contributor Author

Thanks for the feedback! Have reworked the macro quite a bit and it now works with the suggessted syntax:

CreateWebsocket! {
    move |mut channel| {
        while let Some(msg) = channel.next().await {
            println!("handler {msg:?}");
            channel.send(msg);
        }
    }
}

I also added some diagnostics for some common errors for this macro (wrong input count for the closure, missing 'move').

core/codegen/src/bang/websocket.rs Outdated Show resolved Hide resolved
core/codegen/src/bang/websocket.rs Show resolved Hide resolved
core/codegen/src/bang/websocket.rs Outdated Show resolved Hide resolved
let tokens = quote!(
Websocket::create(|#inp| {
::std::boxed::Box::new(
::std::boxed::Box::pin(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary to pin the async block here.
This also makes a double allocation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed since the rokio::join! for the start() method of impl Upgrade for Websocket requires Unpin, and it seems that only a inner Box::pin solves the issue; atleast thats also what rust itself suggests.

https://github.com/Mai-Lapyst/Rocket/blob/6b9cc12c40683ebe3286046893af9fd0a3cbfac0/core/lib/src/response/websocket/mod.rs#L266

Compiler error
error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be unpinned
   --> /home/mai/projects/github/Rocket/core/lib/src/response/websocket/mod.rs:266:28
    |
266 |         tokio::join!(a, b, event_loop);
    |         -------------------^^^^^^^^^^-
    |         |                  |
    |         |                  the trait `Unpin` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send`
    |         required by a bound introduced by this call
    |
    = note: consider using `Box::pin`
    = note: required for `Box<dyn futures::Future<Output = ()> + std::marker::Send>` to implement `futures::Future`
note: required by a bound in `tokio::future::maybe_done::maybe_done`
   --> /home/mai/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.21.2/src/future/maybe_done.rs:24:24
    |
24  | pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
    |                        ^^^^^^ required by this bound in `maybe_done`

error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be unpinned
   --> /home/mai/projects/github/Rocket/core/lib/src/response/websocket/mod.rs:266:9
    |
266 |         tokio::join!(a, b, event_loop);
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send`
    |
    = note: consider using `Box::pin`
    = note: required for `Box<dyn futures::Future<Output = ()> + std::marker::Send>` to implement `futures::Future`
note: required by a bound in `tokio::future::maybe_done::MaybeDone`
   --> /home/mai/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.21.2/src/future/maybe_done.rs:10:25
    |
10  | pub enum MaybeDone<Fut: Future> {
    |                         ^^^^^^ required by this bound in `MaybeDone`
    = note: this error originates in the macro `$crate::join` which comes from the expansion of the macro `tokio::join` (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be unpinned
   --> /home/mai/projects/github/Rocket/core/lib/src/response/websocket/mod.rs:266:9
    |
266 |         tokio::join!(a, b, event_loop);
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send`
    |
    = note: consider using `Box::pin`
    = note: required for `Box<dyn futures::Future<Output = ()> + std::marker::Send>` to implement `futures::Future`
    = note: required because it appears within the type `MaybeDone<Box<dyn Future<Output = ()> + Send>>`
    = note: required because it appears within the type `(MaybeDone<impl Future<Output = ()>>, MaybeDone<impl Future<Output = ()>>, MaybeDone<Box<dyn Future<Output = ()> + Send>>)`
    = note: tuples must have a statically known size to be initialized
    = note: this error originates in the macro `$crate::join` which comes from the expansion of the macro `tokio::join` (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0599]: the method `poll` exists for struct `Pin<&mut MaybeDone<Box<dyn Future<Output = ()> + Send>>>`, but its trait bounds were not satisfied
   --> /home/mai/projects/github/Rocket/core/lib/src/response/websocket/mod.rs:266:9
    |
266 |           tokio::join!(a, b, event_loop);
    |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ method cannot be called due to unsatisfied trait bounds
    |
   ::: /home/mai/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.21.2/src/future/maybe_done.rs:10:1
    |
10  |   pub enum MaybeDone<Fut: Future> {
    |   ------------------------------- doesn't satisfy `_: Future`
    |
   ::: /home/mai/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:198:1
    |
198 | / pub struct Box<
199 | |     T: ?Sized,
200 | |     #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
201 | | >(Unique<T>, A);
    | |_- doesn't satisfy `_: Future`
    |
    = note: the following trait bounds were not satisfied:
            `Box<dyn futures::Future<Output = ()> + std::marker::Send>: futures::Future`
            which is required by `tokio::future::maybe_done::MaybeDone<Box<dyn futures::Future<Output = ()> + std::marker::Send>>: futures::Future`
    = note: this error originates in the macro `$crate::join` which comes from the expansion of the macro `tokio::join` (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0599]: the method `take_output` exists for struct `Pin<&mut MaybeDone<Box<dyn Future<Output = ()> + Send>>>`, but its trait bounds were not satisfied
   --> /home/mai/projects/github/Rocket/core/lib/src/response/websocket/mod.rs:266:9
    |
266 |           tokio::join!(a, b, event_loop);
    |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ method cannot be called due to unsatisfied trait bounds
    |
   ::: /home/mai/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:37:1
    |
37  |   pub trait Future {
    |   ---------------- doesn't satisfy `_: Unpin`
    |
   ::: /home/mai/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:198:1
    |
198 | / pub struct Box<
199 | |     T: ?Sized,
200 | |     #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
201 | | >(Unique<T>, A);
    | |_- doesn't satisfy `_: Future`
    |
    = note: the following trait bounds were not satisfied:
            `Box<dyn futures::Future<Output = ()> + std::marker::Send>: futures::Future`
            `dyn futures::Future<Output = ()> + std::marker::Send: Unpin`
    = note: this error originates in the macro `$crate::join` which comes from the expansion of the macro `tokio::join` (in Nightly builds, run with -Z macro-backtrace for more info)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I didn't know async blocks are !Unpin.

Copy link

@tguichaoua tguichaoua Mar 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know if it will work, but what if you use Box::into_pin(event_loop) or tokio::pin!(event_loop) before using tokio::join! ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Box::into_pin it's working; added a commit.

@SergioBenitez
Copy link
Member

I'm closing this as I want WebSocket support to live external to Rocket initially. We should be able to implement this exact API externally. Let's focus on #2488 which makes that happen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Native WebSocket support
3 participants