Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

websockets and upgrade mechanism #2466

Closed
wants to merge 7 commits into from
6 changes: 6 additions & 0 deletions core/codegen/src/bang/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod test_guide;
mod export;

pub mod typed_stream;
pub mod websocket;

use devise::Result;
use syn::{Path, punctuated::Punctuated, parse::Parser, Token};
Expand Down Expand Up @@ -77,3 +78,8 @@ pub fn typed_stream(input: proc_macro::TokenStream) -> TokenStream {
typed_stream::_macro(input)
.unwrap_or_else(|diag| diag.emit_as_item_tokens())
}

pub fn websocket(input: proc_macro::TokenStream) -> TokenStream {
websocket::_macro(input)
.unwrap_or_else(|diag| diag.emit_as_item_tokens())
}
56 changes: 56 additions & 0 deletions core/codegen/src/bang/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use devise::{Level, Diagnostic, Spanned};
use proc_macro2::TokenStream;
use syn::{parse::{Parse, ParseStream, discouraged::Speculative}, Pat};

pub fn _macro(input: proc_macro::TokenStream) -> devise::Result<TokenStream> {
let closure: syn::ExprClosure = syn::parse(input)?;

if closure.inputs.len() != 1 {
return Err(Diagnostic::spanned(
closure.inputs.span(),
Level::Error,
"rocket::response::websocket::CreateWebsocket! needs exactly one closure input"
));
}

if closure.capture.is_none() {
return Err(
Diagnostic::spanned(
closure.span(),
Level::Error,
"rocket::response::websocket::CreateWebsocket! needs an closure that captures it's inputs"
)
.span_help(closure.or1_token.span(), "add the `move` keyword to the closure")
);
}
Mai-Lapyst marked this conversation as resolved.
Show resolved Hide resolved

let inp = closure.inputs.first().unwrap();
match inp {
Pat::Ident(_) => {}
Pat::Type(_) => {}
_ => {
return Err(
Diagnostic::spanned(
inp.span(),
Level::Error,
"rocket::response::websocket::CreateWebsocket! can only accept an identifier or a type ascription for closure input"
)
)
}
}
Mai-Lapyst marked this conversation as resolved.
Show resolved Hide resolved

let body = closure.body;
let capture = closure.capture;
let tokens = quote!(
Websocket::create(|#inp| {
Mai-Lapyst marked this conversation as resolved.
Show resolved Hide resolved
::std::boxed::Box::new(
::std::boxed::Box::pin(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary to pin the async block here.
This also makes a double allocation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed since the rokio::join! for the start() method of impl Upgrade for Websocket requires Unpin, and it seems that only a inner Box::pin solves the issue; atleast thats also what rust itself suggests.

https://github.com/Mai-Lapyst/Rocket/blob/6b9cc12c40683ebe3286046893af9fd0a3cbfac0/core/lib/src/response/websocket/mod.rs#L266

Compiler error
error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be unpinned
   --> /home/mai/projects/github/Rocket/core/lib/src/response/websocket/mod.rs:266:28
    |
266 |         tokio::join!(a, b, event_loop);
    |         -------------------^^^^^^^^^^-
    |         |                  |
    |         |                  the trait `Unpin` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send`
    |         required by a bound introduced by this call
    |
    = note: consider using `Box::pin`
    = note: required for `Box<dyn futures::Future<Output = ()> + std::marker::Send>` to implement `futures::Future`
note: required by a bound in `tokio::future::maybe_done::maybe_done`
   --> /home/mai/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.21.2/src/future/maybe_done.rs:24:24
    |
24  | pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
    |                        ^^^^^^ required by this bound in `maybe_done`

error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be unpinned
   --> /home/mai/projects/github/Rocket/core/lib/src/response/websocket/mod.rs:266:9
    |
266 |         tokio::join!(a, b, event_loop);
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send`
    |
    = note: consider using `Box::pin`
    = note: required for `Box<dyn futures::Future<Output = ()> + std::marker::Send>` to implement `futures::Future`
note: required by a bound in `tokio::future::maybe_done::MaybeDone`
   --> /home/mai/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.21.2/src/future/maybe_done.rs:10:25
    |
10  | pub enum MaybeDone<Fut: Future> {
    |                         ^^^^^^ required by this bound in `MaybeDone`
    = note: this error originates in the macro `$crate::join` which comes from the expansion of the macro `tokio::join` (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be unpinned
   --> /home/mai/projects/github/Rocket/core/lib/src/response/websocket/mod.rs:266:9
    |
266 |         tokio::join!(a, b, event_loop);
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send`
    |
    = note: consider using `Box::pin`
    = note: required for `Box<dyn futures::Future<Output = ()> + std::marker::Send>` to implement `futures::Future`
    = note: required because it appears within the type `MaybeDone<Box<dyn Future<Output = ()> + Send>>`
    = note: required because it appears within the type `(MaybeDone<impl Future<Output = ()>>, MaybeDone<impl Future<Output = ()>>, MaybeDone<Box<dyn Future<Output = ()> + Send>>)`
    = note: tuples must have a statically known size to be initialized
    = note: this error originates in the macro `$crate::join` which comes from the expansion of the macro `tokio::join` (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0599]: the method `poll` exists for struct `Pin<&mut MaybeDone<Box<dyn Future<Output = ()> + Send>>>`, but its trait bounds were not satisfied
   --> /home/mai/projects/github/Rocket/core/lib/src/response/websocket/mod.rs:266:9
    |
266 |           tokio::join!(a, b, event_loop);
    |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ method cannot be called due to unsatisfied trait bounds
    |
   ::: /home/mai/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.21.2/src/future/maybe_done.rs:10:1
    |
10  |   pub enum MaybeDone<Fut: Future> {
    |   ------------------------------- doesn't satisfy `_: Future`
    |
   ::: /home/mai/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:198:1
    |
198 | / pub struct Box<
199 | |     T: ?Sized,
200 | |     #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
201 | | >(Unique<T>, A);
    | |_- doesn't satisfy `_: Future`
    |
    = note: the following trait bounds were not satisfied:
            `Box<dyn futures::Future<Output = ()> + std::marker::Send>: futures::Future`
            which is required by `tokio::future::maybe_done::MaybeDone<Box<dyn futures::Future<Output = ()> + std::marker::Send>>: futures::Future`
    = note: this error originates in the macro `$crate::join` which comes from the expansion of the macro `tokio::join` (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0599]: the method `take_output` exists for struct `Pin<&mut MaybeDone<Box<dyn Future<Output = ()> + Send>>>`, but its trait bounds were not satisfied
   --> /home/mai/projects/github/Rocket/core/lib/src/response/websocket/mod.rs:266:9
    |
266 |           tokio::join!(a, b, event_loop);
    |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ method cannot be called due to unsatisfied trait bounds
    |
   ::: /home/mai/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:37:1
    |
37  |   pub trait Future {
    |   ---------------- doesn't satisfy `_: Unpin`
    |
   ::: /home/mai/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:198:1
    |
198 | / pub struct Box<
199 | |     T: ?Sized,
200 | |     #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
201 | | >(Unique<T>, A);
    | |_- doesn't satisfy `_: Future`
    |
    = note: the following trait bounds were not satisfied:
            `Box<dyn futures::Future<Output = ()> + std::marker::Send>: futures::Future`
            `dyn futures::Future<Output = ()> + std::marker::Send: Unpin`
    = note: this error originates in the macro `$crate::join` which comes from the expansion of the macro `tokio::join` (in Nightly builds, run with -Z macro-backtrace for more info)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I didn't know async blocks are !Unpin.

Copy link

@tguichaoua tguichaoua Mar 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know if it will work, but what if you use Box::into_pin(event_loop) or tokio::pin!(event_loop) before using tokio::join! ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Box::into_pin it's working; added a commit.

async #capture {
#body
}
)
)
})
);
Ok(tokens)
}
13 changes: 1 addition & 12 deletions core/codegen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1490,18 +1490,7 @@ pub fn __typed_stream(input: TokenStream) -> TokenStream {
#[proc_macro]
#[doc(hidden)]
pub fn __websocket(input: TokenStream) -> TokenStream {
let stmts = syn::Block::parse_within.parse(input).expect("Input to __websocket! should be statements");
quote!(
Websocket::create(|mut ch: WebsocketChannel| {
::std::boxed::Box::new(
::std::boxed::Box::pin(
async move {
#(#stmts)*
}
)
)
})
).into()
emit!(bang::websocket(input))
}

/// Private Rocket internal macro: `internal_guide_tests!`.
Expand Down