diff --git a/Cargo.toml b/Cargo.toml index e2f924e84..fb6662ad1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,8 +65,13 @@ features = [ "Win32_System_WindowsProgramming", # General future used for various types/funcs ] -[target.'cfg(target_os = "wasi")'.dependencies] +[target.'cfg(all(target_os = "wasi", not(target_env = "p2")))'.dependencies] wasi = "0.11.0" + +[target.'cfg(all(target_os = "wasi", target_env = "p2"))'.dependencies] +wasi = "0.13.3" + +[target.'cfg(target_os = "wasi")'.dependencies] libc = "0.2.159" [dev-dependencies] @@ -79,7 +84,8 @@ rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] targets = [ "aarch64-apple-ios", "aarch64-linux-android", - "wasm32-wasi", + "wasm32-wasip1", + "wasm32-wasip2", "x86_64-apple-darwin", "x86_64-pc-windows-gnu", "x86_64-pc-windows-msvc", diff --git a/src/io_source.rs b/src/io_source.rs index cd57653ab..dc3c756e2 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -104,7 +104,7 @@ impl DerefMut for IoSource { } } -#[cfg(any(unix, target_os = "hermit"))] +#[cfg(any(unix, target_os = "hermit", all(target_os = "wasi", target_env = "p2")))] impl event::Source for IoSource where T: AsRawFd, @@ -175,7 +175,7 @@ where } } -#[cfg(target_os = "wasi")] +#[cfg(all(target_os = "wasi", not(target_env = "p2")))] impl event::Source for IoSource where T: AsRawFd, diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index f3db74fab..7eb45e1a2 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -13,7 +13,7 @@ use std::os::windows::io::{ }; use crate::io_source::IoSource; -#[cfg(not(target_os = "wasi"))] +#[cfg(any(not(target_os = "wasi"), target_env = "p2"))] use crate::sys::tcp::{connect, new_for_addr}; use crate::{event, Interest, Registry, Token}; @@ -87,10 +87,10 @@ impl TcpStream { /// entries in the routing cache. /// /// [write interest]: Interest::WRITABLE - #[cfg(not(target_os = "wasi"))] + #[cfg(any(not(target_os = "wasi"), target_env = "p2"))] pub fn connect(addr: SocketAddr) -> io::Result { let socket = new_for_addr(addr)?; - #[cfg(any(unix, target_os = "hermit"))] + #[cfg(any(unix, target_os = "hermit", all(target_os = "wasi", target_env = "p2")))] let stream = unsafe { TcpStream::from_raw_fd(socket) }; #[cfg(windows)] let stream = unsafe { TcpStream::from_raw_socket(socket as _) }; diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 5f1ee9c9c..b6b71337c 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -15,6 +15,7 @@ //! * `Waker`: see [`crate::Waker`]. cfg_os_poll! { + #[cfg(not(all(target_os = "wasi", target_env = "p2")))] macro_rules! debug_detail { ( $type: ident ($event_type: ty), $test: path, diff --git a/src/sys/wasi/mod.rs b/src/sys/wasi/mod.rs index e1169d28b..ee42491bc 100644 --- a/src/sys/wasi/mod.rs +++ b/src/sys/wasi/mod.rs @@ -1,370 +1,11 @@ -//! # Notes -//! -//! The current implementation is somewhat limited. The `Waker` is not -//! implemented, as at the time of writing there is no way to support to wake-up -//! a thread from calling `poll_oneoff`. -//! -//! Furthermore the (re/de)register functions also don't work while concurrently -//! polling as both registering and polling requires a lock on the -//! `subscriptions`. -//! -//! Finally `Selector::try_clone`, required by `Registry::try_clone`, doesn't -//! work. However this could be implemented by use of an `Arc`. -//! -//! In summary, this only (barely) works using a single thread. +#[cfg(target_env = "p2")] +mod p2; -use std::cmp::min; -use std::io; -#[cfg(all(feature = "net", debug_assertions))] -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; -use std::time::Duration; +#[cfg(target_env = "p2")] +pub(crate) use p2::*; -#[cfg(feature = "net")] -use crate::{Interest, Token}; +#[cfg(not(target_env = "p2"))] +mod p1; -cfg_net! { - pub(crate) mod tcp { - use std::io; - use std::net::{self, SocketAddr}; - - pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> { - let (stream, addr) = listener.accept()?; - stream.set_nonblocking(true)?; - Ok((stream, addr)) - } - } -} - -/// Unique id for use as `SelectorId`. -#[cfg(all(debug_assertions, feature = "net"))] -static NEXT_ID: AtomicUsize = AtomicUsize::new(1); - -pub(crate) struct Selector { - #[cfg(all(debug_assertions, feature = "net"))] - id: usize, - /// Subscriptions (reads events) we're interested in. - subscriptions: Arc>>, -} - -impl Selector { - pub(crate) fn new() -> io::Result { - Ok(Selector { - #[cfg(all(debug_assertions, feature = "net"))] - id: NEXT_ID.fetch_add(1, Ordering::Relaxed), - subscriptions: Arc::new(Mutex::new(Vec::new())), - }) - } - - #[cfg(all(debug_assertions, feature = "net"))] - pub(crate) fn id(&self) -> usize { - self.id - } - - pub(crate) fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { - events.clear(); - - let mut subscriptions = self.subscriptions.lock().unwrap(); - - // If we want to a use a timeout in the `wasi_poll_oneoff()` function - // we need another subscription to the list. - if let Some(timeout) = timeout { - subscriptions.push(timeout_subscription(timeout)); - } - - // `poll_oneoff` needs the same number of events as subscriptions. - let length = subscriptions.len(); - events.reserve(length); - - debug_assert!(events.capacity() >= length); - #[cfg(debug_assertions)] - if length == 0 { - warn!( - "calling mio::Poll::poll with empty subscriptions, this likely not what you want" - ); - } - - let res = unsafe { wasi::poll_oneoff(subscriptions.as_ptr(), events.as_mut_ptr(), length) }; - - // Remove the timeout subscription we possibly added above. - if timeout.is_some() { - let timeout_sub = subscriptions.pop(); - debug_assert_eq!( - timeout_sub.unwrap().u.tag, - wasi::EVENTTYPE_CLOCK.raw(), - "failed to remove timeout subscription" - ); - } - - drop(subscriptions); // Unlock. - - match res { - Ok(n_events) => { - // Safety: `poll_oneoff` initialises the `events` for us. - unsafe { events.set_len(n_events) }; - - // Remove the timeout event. - if timeout.is_some() { - if let Some(index) = events.iter().position(is_timeout_event) { - events.swap_remove(index); - } - } - - check_errors(&events) - } - Err(err) => Err(io_err(err)), - } - } - - pub(crate) fn try_clone(&self) -> io::Result { - Ok(Selector { - #[cfg(all(debug_assertions, feature = "net"))] - id: self.id, - subscriptions: self.subscriptions.clone(), - }) - } - - #[cfg(feature = "net")] - pub(crate) fn register( - &self, - fd: wasi::Fd, - token: Token, - interests: Interest, - ) -> io::Result<()> { - let mut subscriptions = self.subscriptions.lock().unwrap(); - - if interests.is_writable() { - let subscription = wasi::Subscription { - userdata: token.0 as wasi::Userdata, - u: wasi::SubscriptionU { - tag: wasi::EVENTTYPE_FD_WRITE.raw(), - u: wasi::SubscriptionUU { - fd_write: wasi::SubscriptionFdReadwrite { - file_descriptor: fd, - }, - }, - }, - }; - subscriptions.push(subscription); - } - - if interests.is_readable() { - let subscription = wasi::Subscription { - userdata: token.0 as wasi::Userdata, - u: wasi::SubscriptionU { - tag: wasi::EVENTTYPE_FD_READ.raw(), - u: wasi::SubscriptionUU { - fd_read: wasi::SubscriptionFdReadwrite { - file_descriptor: fd, - }, - }, - }, - }; - subscriptions.push(subscription); - } - - Ok(()) - } - - #[cfg(feature = "net")] - pub(crate) fn reregister( - &self, - fd: wasi::Fd, - token: Token, - interests: Interest, - ) -> io::Result<()> { - self.deregister(fd) - .and_then(|()| self.register(fd, token, interests)) - } - - #[cfg(feature = "net")] - pub(crate) fn deregister(&self, fd: wasi::Fd) -> io::Result<()> { - let mut subscriptions = self.subscriptions.lock().unwrap(); - - let predicate = |subscription: &wasi::Subscription| { - // Safety: `subscription.u.tag` defines the type of the union in - // `subscription.u.u`. - match subscription.u.tag { - t if t == wasi::EVENTTYPE_FD_WRITE.raw() => unsafe { - subscription.u.u.fd_write.file_descriptor == fd - }, - t if t == wasi::EVENTTYPE_FD_READ.raw() => unsafe { - subscription.u.u.fd_read.file_descriptor == fd - }, - _ => false, - } - }; - - let mut ret = Err(io::ErrorKind::NotFound.into()); - - while let Some(index) = subscriptions.iter().position(predicate) { - subscriptions.swap_remove(index); - ret = Ok(()) - } - - ret - } -} - -/// Token used to a add a timeout subscription, also used in removing it again. -const TIMEOUT_TOKEN: wasi::Userdata = wasi::Userdata::MAX; - -/// Returns a `wasi::Subscription` for `timeout`. -fn timeout_subscription(timeout: Duration) -> wasi::Subscription { - wasi::Subscription { - userdata: TIMEOUT_TOKEN, - u: wasi::SubscriptionU { - tag: wasi::EVENTTYPE_CLOCK.raw(), - u: wasi::SubscriptionUU { - clock: wasi::SubscriptionClock { - id: wasi::CLOCKID_MONOTONIC, - // Timestamp is in nanoseconds. - timeout: min(wasi::Timestamp::MAX as u128, timeout.as_nanos()) - as wasi::Timestamp, - // Give the implementation another millisecond to coalesce - // events. - precision: Duration::from_millis(1).as_nanos() as wasi::Timestamp, - // Zero means the `timeout` is considered relative to the - // current time. - flags: 0, - }, - }, - }, - } -} - -fn is_timeout_event(event: &wasi::Event) -> bool { - event.type_ == wasi::EVENTTYPE_CLOCK && event.userdata == TIMEOUT_TOKEN -} - -/// Check all events for possible errors, it returns the first error found. -fn check_errors(events: &[Event]) -> io::Result<()> { - for event in events { - if event.error != wasi::ERRNO_SUCCESS { - return Err(io_err(event.error)); - } - } - Ok(()) -} - -/// Convert `wasi::Errno` into an `io::Error`. -fn io_err(errno: wasi::Errno) -> io::Error { - // TODO: check if this is valid. - io::Error::from_raw_os_error(errno.raw() as i32) -} - -pub(crate) type Events = Vec; - -pub(crate) type Event = wasi::Event; - -pub(crate) mod event { - use std::fmt; - - use crate::sys::Event; - use crate::Token; - - pub(crate) fn token(event: &Event) -> Token { - Token(event.userdata as usize) - } - - pub(crate) fn is_readable(event: &Event) -> bool { - event.type_ == wasi::EVENTTYPE_FD_READ - } - - pub(crate) fn is_writable(event: &Event) -> bool { - event.type_ == wasi::EVENTTYPE_FD_WRITE - } - - pub(crate) fn is_error(_: &Event) -> bool { - // Not supported? It could be that `wasi::Event.error` could be used for - // this, but the docs say `error that occurred while processing the - // subscription request`, so it's checked in `Select::select` already. - false - } - - pub(crate) fn is_read_closed(event: &Event) -> bool { - event.type_ == wasi::EVENTTYPE_FD_READ - // Safety: checked the type of the union above. - && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0 - } - - pub(crate) fn is_write_closed(event: &Event) -> bool { - event.type_ == wasi::EVENTTYPE_FD_WRITE - // Safety: checked the type of the union above. - && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0 - } - - pub(crate) fn is_priority(_: &Event) -> bool { - // Not supported. - false - } - - pub(crate) fn is_aio(_: &Event) -> bool { - // Not supported. - false - } - - pub(crate) fn is_lio(_: &Event) -> bool { - // Not supported. - false - } - - pub(crate) fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { - debug_detail!( - TypeDetails(wasi::Eventtype), - PartialEq::eq, - wasi::EVENTTYPE_CLOCK, - wasi::EVENTTYPE_FD_READ, - wasi::EVENTTYPE_FD_WRITE, - ); - - #[allow(clippy::trivially_copy_pass_by_ref)] - fn check_flag(got: &wasi::Eventrwflags, want: &wasi::Eventrwflags) -> bool { - (got & want) != 0 - } - debug_detail!( - EventrwflagsDetails(wasi::Eventrwflags), - check_flag, - wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP, - ); - - struct EventFdReadwriteDetails(wasi::EventFdReadwrite); - - impl fmt::Debug for EventFdReadwriteDetails { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("EventFdReadwrite") - .field("nbytes", &self.0.nbytes) - .field("flags", &EventrwflagsDetails(self.0.flags)) - .finish() - } - } - - f.debug_struct("Event") - .field("userdata", &event.userdata) - .field("error", &event.error) - .field("type", &TypeDetails(event.type_)) - .field("fd_readwrite", &EventFdReadwriteDetails(event.fd_readwrite)) - .finish() - } -} - -cfg_os_poll! { - cfg_io_source! { - pub(crate) struct IoSourceState; - - impl IoSourceState { - pub(crate) fn new() -> IoSourceState { - IoSourceState - } - - pub(crate) fn do_io(&self, f: F, io: &T) -> io::Result - where - F: FnOnce(&T) -> io::Result, - { - // We don't hold state, so we can just call the function and - // return. - f(io) - } - } - } -} +#[cfg(not(target_env = "p2"))] +pub(crate) use p1::*; diff --git a/src/sys/wasi/p1.rs b/src/sys/wasi/p1.rs new file mode 100644 index 000000000..e1169d28b --- /dev/null +++ b/src/sys/wasi/p1.rs @@ -0,0 +1,370 @@ +//! # Notes +//! +//! The current implementation is somewhat limited. The `Waker` is not +//! implemented, as at the time of writing there is no way to support to wake-up +//! a thread from calling `poll_oneoff`. +//! +//! Furthermore the (re/de)register functions also don't work while concurrently +//! polling as both registering and polling requires a lock on the +//! `subscriptions`. +//! +//! Finally `Selector::try_clone`, required by `Registry::try_clone`, doesn't +//! work. However this could be implemented by use of an `Arc`. +//! +//! In summary, this only (barely) works using a single thread. + +use std::cmp::min; +use std::io; +#[cfg(all(feature = "net", debug_assertions))] +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +#[cfg(feature = "net")] +use crate::{Interest, Token}; + +cfg_net! { + pub(crate) mod tcp { + use std::io; + use std::net::{self, SocketAddr}; + + pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> { + let (stream, addr) = listener.accept()?; + stream.set_nonblocking(true)?; + Ok((stream, addr)) + } + } +} + +/// Unique id for use as `SelectorId`. +#[cfg(all(debug_assertions, feature = "net"))] +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +pub(crate) struct Selector { + #[cfg(all(debug_assertions, feature = "net"))] + id: usize, + /// Subscriptions (reads events) we're interested in. + subscriptions: Arc>>, +} + +impl Selector { + pub(crate) fn new() -> io::Result { + Ok(Selector { + #[cfg(all(debug_assertions, feature = "net"))] + id: NEXT_ID.fetch_add(1, Ordering::Relaxed), + subscriptions: Arc::new(Mutex::new(Vec::new())), + }) + } + + #[cfg(all(debug_assertions, feature = "net"))] + pub(crate) fn id(&self) -> usize { + self.id + } + + pub(crate) fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + events.clear(); + + let mut subscriptions = self.subscriptions.lock().unwrap(); + + // If we want to a use a timeout in the `wasi_poll_oneoff()` function + // we need another subscription to the list. + if let Some(timeout) = timeout { + subscriptions.push(timeout_subscription(timeout)); + } + + // `poll_oneoff` needs the same number of events as subscriptions. + let length = subscriptions.len(); + events.reserve(length); + + debug_assert!(events.capacity() >= length); + #[cfg(debug_assertions)] + if length == 0 { + warn!( + "calling mio::Poll::poll with empty subscriptions, this likely not what you want" + ); + } + + let res = unsafe { wasi::poll_oneoff(subscriptions.as_ptr(), events.as_mut_ptr(), length) }; + + // Remove the timeout subscription we possibly added above. + if timeout.is_some() { + let timeout_sub = subscriptions.pop(); + debug_assert_eq!( + timeout_sub.unwrap().u.tag, + wasi::EVENTTYPE_CLOCK.raw(), + "failed to remove timeout subscription" + ); + } + + drop(subscriptions); // Unlock. + + match res { + Ok(n_events) => { + // Safety: `poll_oneoff` initialises the `events` for us. + unsafe { events.set_len(n_events) }; + + // Remove the timeout event. + if timeout.is_some() { + if let Some(index) = events.iter().position(is_timeout_event) { + events.swap_remove(index); + } + } + + check_errors(&events) + } + Err(err) => Err(io_err(err)), + } + } + + pub(crate) fn try_clone(&self) -> io::Result { + Ok(Selector { + #[cfg(all(debug_assertions, feature = "net"))] + id: self.id, + subscriptions: self.subscriptions.clone(), + }) + } + + #[cfg(feature = "net")] + pub(crate) fn register( + &self, + fd: wasi::Fd, + token: Token, + interests: Interest, + ) -> io::Result<()> { + let mut subscriptions = self.subscriptions.lock().unwrap(); + + if interests.is_writable() { + let subscription = wasi::Subscription { + userdata: token.0 as wasi::Userdata, + u: wasi::SubscriptionU { + tag: wasi::EVENTTYPE_FD_WRITE.raw(), + u: wasi::SubscriptionUU { + fd_write: wasi::SubscriptionFdReadwrite { + file_descriptor: fd, + }, + }, + }, + }; + subscriptions.push(subscription); + } + + if interests.is_readable() { + let subscription = wasi::Subscription { + userdata: token.0 as wasi::Userdata, + u: wasi::SubscriptionU { + tag: wasi::EVENTTYPE_FD_READ.raw(), + u: wasi::SubscriptionUU { + fd_read: wasi::SubscriptionFdReadwrite { + file_descriptor: fd, + }, + }, + }, + }; + subscriptions.push(subscription); + } + + Ok(()) + } + + #[cfg(feature = "net")] + pub(crate) fn reregister( + &self, + fd: wasi::Fd, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.deregister(fd) + .and_then(|()| self.register(fd, token, interests)) + } + + #[cfg(feature = "net")] + pub(crate) fn deregister(&self, fd: wasi::Fd) -> io::Result<()> { + let mut subscriptions = self.subscriptions.lock().unwrap(); + + let predicate = |subscription: &wasi::Subscription| { + // Safety: `subscription.u.tag` defines the type of the union in + // `subscription.u.u`. + match subscription.u.tag { + t if t == wasi::EVENTTYPE_FD_WRITE.raw() => unsafe { + subscription.u.u.fd_write.file_descriptor == fd + }, + t if t == wasi::EVENTTYPE_FD_READ.raw() => unsafe { + subscription.u.u.fd_read.file_descriptor == fd + }, + _ => false, + } + }; + + let mut ret = Err(io::ErrorKind::NotFound.into()); + + while let Some(index) = subscriptions.iter().position(predicate) { + subscriptions.swap_remove(index); + ret = Ok(()) + } + + ret + } +} + +/// Token used to a add a timeout subscription, also used in removing it again. +const TIMEOUT_TOKEN: wasi::Userdata = wasi::Userdata::MAX; + +/// Returns a `wasi::Subscription` for `timeout`. +fn timeout_subscription(timeout: Duration) -> wasi::Subscription { + wasi::Subscription { + userdata: TIMEOUT_TOKEN, + u: wasi::SubscriptionU { + tag: wasi::EVENTTYPE_CLOCK.raw(), + u: wasi::SubscriptionUU { + clock: wasi::SubscriptionClock { + id: wasi::CLOCKID_MONOTONIC, + // Timestamp is in nanoseconds. + timeout: min(wasi::Timestamp::MAX as u128, timeout.as_nanos()) + as wasi::Timestamp, + // Give the implementation another millisecond to coalesce + // events. + precision: Duration::from_millis(1).as_nanos() as wasi::Timestamp, + // Zero means the `timeout` is considered relative to the + // current time. + flags: 0, + }, + }, + }, + } +} + +fn is_timeout_event(event: &wasi::Event) -> bool { + event.type_ == wasi::EVENTTYPE_CLOCK && event.userdata == TIMEOUT_TOKEN +} + +/// Check all events for possible errors, it returns the first error found. +fn check_errors(events: &[Event]) -> io::Result<()> { + for event in events { + if event.error != wasi::ERRNO_SUCCESS { + return Err(io_err(event.error)); + } + } + Ok(()) +} + +/// Convert `wasi::Errno` into an `io::Error`. +fn io_err(errno: wasi::Errno) -> io::Error { + // TODO: check if this is valid. + io::Error::from_raw_os_error(errno.raw() as i32) +} + +pub(crate) type Events = Vec; + +pub(crate) type Event = wasi::Event; + +pub(crate) mod event { + use std::fmt; + + use crate::sys::Event; + use crate::Token; + + pub(crate) fn token(event: &Event) -> Token { + Token(event.userdata as usize) + } + + pub(crate) fn is_readable(event: &Event) -> bool { + event.type_ == wasi::EVENTTYPE_FD_READ + } + + pub(crate) fn is_writable(event: &Event) -> bool { + event.type_ == wasi::EVENTTYPE_FD_WRITE + } + + pub(crate) fn is_error(_: &Event) -> bool { + // Not supported? It could be that `wasi::Event.error` could be used for + // this, but the docs say `error that occurred while processing the + // subscription request`, so it's checked in `Select::select` already. + false + } + + pub(crate) fn is_read_closed(event: &Event) -> bool { + event.type_ == wasi::EVENTTYPE_FD_READ + // Safety: checked the type of the union above. + && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0 + } + + pub(crate) fn is_write_closed(event: &Event) -> bool { + event.type_ == wasi::EVENTTYPE_FD_WRITE + // Safety: checked the type of the union above. + && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0 + } + + pub(crate) fn is_priority(_: &Event) -> bool { + // Not supported. + false + } + + pub(crate) fn is_aio(_: &Event) -> bool { + // Not supported. + false + } + + pub(crate) fn is_lio(_: &Event) -> bool { + // Not supported. + false + } + + pub(crate) fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { + debug_detail!( + TypeDetails(wasi::Eventtype), + PartialEq::eq, + wasi::EVENTTYPE_CLOCK, + wasi::EVENTTYPE_FD_READ, + wasi::EVENTTYPE_FD_WRITE, + ); + + #[allow(clippy::trivially_copy_pass_by_ref)] + fn check_flag(got: &wasi::Eventrwflags, want: &wasi::Eventrwflags) -> bool { + (got & want) != 0 + } + debug_detail!( + EventrwflagsDetails(wasi::Eventrwflags), + check_flag, + wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP, + ); + + struct EventFdReadwriteDetails(wasi::EventFdReadwrite); + + impl fmt::Debug for EventFdReadwriteDetails { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EventFdReadwrite") + .field("nbytes", &self.0.nbytes) + .field("flags", &EventrwflagsDetails(self.0.flags)) + .finish() + } + } + + f.debug_struct("Event") + .field("userdata", &event.userdata) + .field("error", &event.error) + .field("type", &TypeDetails(event.type_)) + .field("fd_readwrite", &EventFdReadwriteDetails(event.fd_readwrite)) + .finish() + } +} + +cfg_os_poll! { + cfg_io_source! { + pub(crate) struct IoSourceState; + + impl IoSourceState { + pub(crate) fn new() -> IoSourceState { + IoSourceState + } + + pub(crate) fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + // We don't hold state, so we can just call the function and + // return. + f(io) + } + } + } +} diff --git a/src/sys/wasi/p2.rs b/src/sys/wasi/p2.rs new file mode 100644 index 000000000..c7377db28 --- /dev/null +++ b/src/sys/wasi/p2.rs @@ -0,0 +1,675 @@ +//! # Notes +//! +//! As with WASIp1, WASIp2 is single-threaded, meaning there is no support for +//! `Waker`, nor is there support for (de)registering descriptors while polling. + +// # Implementation Notes +// +// This implementation currently uses a mix of POSIX-style APIs (provided by +// `wasi-libc` via the `libc` crate) and WASIp2-native APIs (provided by the +// `wasi` crate). +// +// Alternatively, we could implement `Selector` using only POSIX APIs, +// e.g. `poll(2)`. However, that would add an extra layer of abstraction to +// support and debug, as well as make it impossible to support polling +// `wasi:io/poll/pollable` objects which cannot be represented as POSIX file +// descriptors (e.g. timer events, DNS queries, HTTP requests, etc.). +// +// Another approach would be to use _only_ the WASIp2 APIs and bypass +// `wasi-libc` entirely. However, that would break interoperability with both +// Rust `std` and e.g. C libraries which expect to work with file descriptors. +// +// Since `wasi-libc` does not yet provide a public API for converting between +// file descriptors and WASIp2 resource handles, we currently use a non-public +// API (see the `netc` module below) to do so. Once +// https://github.com/WebAssembly/wasi-libc/issues/542 is addressed, we'll be +// able to switch to a public API. +// +// TODO #1: Add a public, WASIp2-only API for registering +// `wasi::io::poll::Pollable`s directly (i.e. those which do not correspond to +// any `wasi-libc` file descriptor. +// +// TODO #2: Add support for binding, listening, and accepting. This would +// involve adding cases for `TCP_SOCKET_STATE_UNBOUND`, +// `TCP_SOCKET_STATE_BOUND`, and `TCP_SOCKET_STATE_LISTENING` to the `match` +// statements in `Selector::select`. +// +// TODO #3: Add support for UDP sockets. This would involve adding cases for +// the `UDP_SOCKET_STATE_*` tags to the `match` statements in +// `Selector::select`. + +use crate::{Interest, Token}; +use netc as libc; +use std::collections::HashMap; +use std::convert::{TryFrom, TryInto}; +use std::io; +use std::mem::ManuallyDrop; +use std::ops::Deref; +use std::os::fd::RawFd; +use std::ptr; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use wasi::{ + clocks::monotonic_clock, + io::poll::{self, Pollable}, + sockets::{network::ErrorCode, tcp::TcpSocket}, +}; + +#[cfg(all(debug_assertions, feature = "net"))] +use std::sync::atomic::{AtomicUsize, Ordering}; + +#[cfg(feature = "net")] +use { + crate::Registry, + std::{ffi::c_int, net::SocketAddr}, +}; + +#[allow(unused_macros)] +macro_rules! syscall { + ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ + let res = unsafe { libc::$fn($($arg, )*) }; + if res == -1 { + Err(std::io::Error::last_os_error()) + } else { + Ok(res) + } + }}; +} + +cfg_net! { + pub(crate) mod tcp { + use std::io; + use std::os::fd::AsRawFd; + use std::net::{self, SocketAddr}; + use std::ffi::c_int; + use super::{new_socket, socket_addr, netc as libc}; + + pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> { + let (stream, addr) = listener.accept()?; + stream.set_nonblocking(true)?; + Ok((stream, addr)) + } + + pub(crate) fn new_for_addr(address: SocketAddr) -> io::Result { + let domain = match address { + SocketAddr::V4(_) => libc::AF_INET, + SocketAddr::V6(_) => libc::AF_INET6, + }; + new_socket(domain, libc::SOCK_STREAM) + } + + pub(crate) fn connect(socket: &net::TcpStream, addr: SocketAddr) -> io::Result<()> { + let (raw_addr, raw_addr_length) = socket_addr(&addr); + + match syscall!(connect( + socket.as_raw_fd(), + raw_addr.as_ptr(), + raw_addr_length + )) { + Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err), + _ => Ok(()), + } + } + } +} + +#[cfg(all(debug_assertions, feature = "net"))] +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +#[derive(Debug, Copy, Clone)] +struct Subscription { + token: Token, + interests: Option, +} + +pub(crate) struct Selector { + #[cfg(all(debug_assertions, feature = "net"))] + id: usize, + subscriptions: Arc>>, +} + +impl Selector { + pub(crate) fn new() -> io::Result { + Ok(Selector { + #[cfg(all(debug_assertions, feature = "net"))] + id: NEXT_ID.fetch_add(1, Ordering::Relaxed), + subscriptions: Arc::new(Mutex::new(HashMap::new())), + }) + } + + #[cfg(all(debug_assertions, feature = "net"))] + pub(crate) fn id(&self) -> usize { + self.id + } + + pub(crate) fn try_clone(&self) -> io::Result { + Ok(Selector { + #[cfg(all(debug_assertions, feature = "net"))] + id: self.id, + subscriptions: self.subscriptions.clone(), + }) + } + + pub(crate) fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + events.clear(); + + let mut subscriptions = self.subscriptions.lock().unwrap(); + + let mut states = Vec::new(); + for (fd, subscription) in subscriptions.deref() { + let mut entry_ref = ptr::null_mut(); + let entry = unsafe { + if libc::descriptor_table_get_ref(*fd, &mut entry_ref) { + *entry_ref + } else { + return Err(io::Error::from_raw_os_error(libc::EBADF)); + } + }; + + let readable = subscription + .interests + .map(|v| v.is_readable()) + .unwrap_or(false); + + let writable = subscription + .interests + .map(|v| v.is_writable()) + .unwrap_or(false); + + match entry.tag { + libc::descriptor_table_entry_tag_t::DESCRIPTOR_TABLE_ENTRY_TCP_SOCKET => { + let socket = unsafe { entry.value.tcp_socket }; + match socket.state.tag { + libc::tcp_socket_state_tag_t::TCP_SOCKET_STATE_CONNECTING => { + if readable || writable { + states.push(( + ManuallyDrop::new(unsafe { + Pollable::from_handle(socket.socket_pollable) + }), + *fd, + socket, + *subscription, + subscription.interests.unwrap(), + )); + } + } + + libc::tcp_socket_state_tag_t::TCP_SOCKET_STATE_CONNECTED => { + if writable { + states.push(( + ManuallyDrop::new(unsafe { + Pollable::from_handle( + socket.state.value.connected.output_pollable, + ) + }), + *fd, + socket, + *subscription, + Interest::WRITABLE, + )); + } + + if readable { + states.push(( + ManuallyDrop::new(unsafe { + Pollable::from_handle( + socket.state.value.connected.input_pollable, + ) + }), + *fd, + socket, + *subscription, + Interest::READABLE, + )); + } + } + + _ => return Err(io::Error::from_raw_os_error(libc::EBADF)), + } + } + + _ => return Err(io::Error::from_raw_os_error(libc::EBADF)), + } + } + + let mut pollables = states + .iter() + .map(|(pollable, ..)| pollable.deref()) + .collect::>(); + + let timeout = timeout.map(|timeout| { + monotonic_clock::subscribe_duration(timeout.as_nanos().try_into().unwrap()) + }); + pollables.extend(&timeout); + + #[cfg(debug_assertions)] + if pollables.is_empty() { + warn!("calling mio::Poll::poll with empty pollables; this likely not what you want"); + } + + for index in poll::poll(&pollables) { + let index = usize::try_from(index).unwrap(); + if timeout.is_none() || index != pollables.len() - 1 { + let (_, fd, socket, subscription, interests) = &states[index]; + + let mut push_event = || { + events.push(Event { + token: subscription.token, + interests: *interests, + }) + }; + + if socket.state.tag == libc::tcp_socket_state_tag_t::TCP_SOCKET_STATE_CONNECTING { + let socket_resource = + ManuallyDrop::new(unsafe { TcpSocket::from_handle(socket.socket) }); + + let socket_ptr = || unsafe { + let mut entry_ref = ptr::null_mut(); + if libc::descriptor_table_get_ref(*fd, &mut entry_ref) { + &mut (*entry_ref).value.tcp_socket + } else { + unreachable!(); + } + }; + + match socket_resource.finish_connect() { + Ok((rx, tx)) => { + let socket_ptr = socket_ptr(); + socket_ptr.state = libc::tcp_socket_state_t { + tag: libc::tcp_socket_state_tag_t::TCP_SOCKET_STATE_CONNECTED, + value: libc::tcp_socket_state_value_t { + connected: libc::tcp_socket_state_connected_t { + input_pollable: rx.subscribe().take_handle(), + input: rx.take_handle(), + output_pollable: tx.subscribe().take_handle(), + output: tx.take_handle(), + }, + }, + }; + push_event(); + } + Err(ErrorCode::WouldBlock) => {} + Err(error) => { + let socket_ptr = socket_ptr(); + socket_ptr.state = libc::tcp_socket_state_t { + tag: libc::tcp_socket_state_tag_t::TCP_SOCKET_STATE_CONNECT_FAILED, + value: libc::tcp_socket_state_value_t { + connect_failed: libc::tcp_socket_state_connect_failed_t { + error_code: error as u8, + }, + }, + }; + push_event(); + } + } + } else { + // Emulate edge-triggering by deregistering interest in `interests`; `IoSourceState::do_io` + // will re-register if/when appropriate. + let fd_interests = &mut subscriptions.get_mut(fd).unwrap().interests; + *fd_interests = (*fd_interests).and_then(|v| v.remove(*interests)); + push_event(); + } + } + } + + Ok(()) + } +} + +pub(crate) type Events = Vec; + +#[derive(Debug, Copy, Clone)] +pub(crate) struct Event { + token: Token, + interests: Interest, +} + +pub(crate) mod event { + use std::fmt; + + use crate::sys::Event; + use crate::Token; + + pub(crate) fn token(event: &Event) -> Token { + event.token + } + + pub(crate) fn is_readable(event: &Event) -> bool { + event.interests.is_readable() + } + + pub(crate) fn is_writable(event: &Event) -> bool { + event.interests.is_writable() + } + + pub(crate) fn is_error(_: &Event) -> bool { + false + } + + pub(crate) fn is_read_closed(_: &Event) -> bool { + false + } + + pub(crate) fn is_write_closed(_: &Event) -> bool { + false + } + + pub(crate) fn is_priority(_: &Event) -> bool { + // Not supported. + false + } + + pub(crate) fn is_aio(_: &Event) -> bool { + // Not supported. + false + } + + pub(crate) fn is_lio(_: &Event) -> bool { + // Not supported. + false + } + + pub(crate) fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { + use std::fmt::Debug; + event.fmt(f) + } +} + +cfg_os_poll! { + cfg_io_source! { + struct Registration { + subscriptions: Arc>>, + token: Token, + interests: Interest, + fd: RawFd, + } + + pub(crate) struct IoSourceState { + registration: Option + } + + impl IoSourceState { + pub(crate) fn new() -> Self { + IoSourceState { registration: None } + } + + pub(crate) fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + let result = f(io); + + self.registration.as_ref().map(|registration| { + *registration.subscriptions.lock().unwrap().get_mut(®istration.fd).unwrap() = + Subscription { + token: registration.token, + interests: Some(registration.interests) + }; + }); + + result + } + + pub fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + if self.registration.is_some() { + Err(io::ErrorKind::AlreadyExists.into()) + } else { + let subscriptions = registry.selector().subscriptions.clone(); + subscriptions.lock().unwrap().insert(fd, Subscription { token, interests: Some(interests) }); + self.registration = Some(Registration { + subscriptions, token, interests, fd + }); + Ok(()) + } + } + + pub fn reregister( + &mut self, + _registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + if let Some(registration) = &self.registration { + *registration.subscriptions.lock().unwrap().get_mut(&fd).unwrap() = Subscription { + token, + interests: Some(interests) + }; + Ok(()) + } else { + Err(io::ErrorKind::NotFound.into()) + } + } + + pub fn deregister(&mut self, _registry: &Registry, fd: RawFd) -> io::Result<()> { + if let Some(registration) = self.registration.take() { + registration.subscriptions.lock().unwrap().remove(&fd); + } + Ok(()) + } + } + + impl Drop for IoSourceState { + fn drop(&mut self) { + if let Some(registration) = self.registration.take() { + registration.subscriptions.lock().unwrap().remove(®istration.fd); + } + } + } + } +} + +/// Create a new non-blocking socket. +#[cfg(feature = "net")] +pub(crate) fn new_socket(domain: c_int, socket_type: c_int) -> io::Result { + let socket_type = socket_type | libc::SOCK_NONBLOCK; + + let socket = syscall!(socket(domain, socket_type, 0))?; + + Ok(socket) +} + +#[cfg(feature = "net")] +#[repr(C)] +pub(crate) union SocketAddrCRepr { + v4: libc::sockaddr_in, + v6: libc::sockaddr_in6, +} + +#[cfg(feature = "net")] +impl SocketAddrCRepr { + pub(crate) fn as_ptr(&self) -> *const libc::sockaddr { + self as *const _ as *const libc::sockaddr + } +} + +/// Converts a Rust `SocketAddr` into the system representation. +#[cfg(feature = "net")] +pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_t) { + match addr { + SocketAddr::V4(ref addr) => { + // `s_addr` is stored as BE on all machine and the array is in BE order. + // So the native endian conversion method is used so that it's never swapped. + let sin_addr = libc::in_addr { + s_addr: u32::from_ne_bytes(addr.ip().octets()), + }; + + let sockaddr_in = libc::sockaddr_in { + sin_family: libc::AF_INET as libc::sa_family_t, + sin_port: addr.port().to_be(), + sin_addr, + }; + + let sockaddr = SocketAddrCRepr { v4: sockaddr_in }; + let socklen = std::mem::size_of::() as libc::socklen_t; + (sockaddr, socklen) + } + SocketAddr::V6(ref addr) => { + let sockaddr_in6 = libc::sockaddr_in6 { + sin6_family: libc::AF_INET6 as libc::sa_family_t, + sin6_port: addr.port().to_be(), + sin6_addr: libc::in6_addr { + s6_addr: addr.ip().octets(), + }, + sin6_flowinfo: addr.flowinfo(), + sin6_scope_id: addr.scope_id(), + }; + + let sockaddr = SocketAddrCRepr { v6: sockaddr_in6 }; + let socklen = std::mem::size_of::() as libc::socklen_t; + (sockaddr, socklen) + } + } +} + +#[allow(non_camel_case_types, dead_code)] +mod netc { + pub use libc::*; + + // TODO: This should be defined in `libc` but isn't as of v0.2.159: + pub const SOCK_NONBLOCK: c_int = 0x4000; + + // The remainder of this module represents the non-public `wasi-libc` API + // for converting between POSIX file descriptors and WASIp2 resource + // handles. Once https://github.com/WebAssembly/wasi-libc/issues/542 has + // been addressed we'll be able to switch to a public API. + + #[repr(C)] + #[derive(Copy, Clone, Eq, PartialEq, Debug)] + pub enum descriptor_table_entry_tag_t { + DESCRIPTOR_TABLE_ENTRY_TCP_SOCKET, + DESCRIPTOR_TABLE_ENTRY_UDP_SOCKET, + } + + #[repr(C)] + #[derive(Copy, Clone, Eq, PartialEq, Debug)] + pub enum tcp_socket_state_tag_t { + TCP_SOCKET_STATE_UNBOUND, + TCP_SOCKET_STATE_BOUND, + TCP_SOCKET_STATE_CONNECTING, + TCP_SOCKET_STATE_CONNECTED, + TCP_SOCKET_STATE_CONNECT_FAILED, + TCP_SOCKET_STATE_LISTENING, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub struct tcp_socket_state_connected_t { + pub input: u32, + pub input_pollable: u32, + pub output: u32, + pub output_pollable: u32, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub struct tcp_socket_state_connect_failed_t { + pub error_code: u8, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub union tcp_socket_state_value_t { + pub connected: tcp_socket_state_connected_t, + pub connect_failed: tcp_socket_state_connect_failed_t, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub struct tcp_socket_state_t { + pub tag: tcp_socket_state_tag_t, + pub value: tcp_socket_state_value_t, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub struct tcp_socket_t { + pub socket: u32, + pub socket_pollable: u32, + pub blocking: bool, + pub fake_nodelay: bool, + pub family: u8, + pub state: tcp_socket_state_t, + } + + #[repr(C)] + #[derive(Copy, Clone, Eq, PartialEq, Debug)] + pub enum udp_socket_state_tag_t { + UDP_SOCKET_STATE_UNBOUND, + UDP_SOCKET_STATE_BOUND_NOSTREAMS, + UDP_SOCKET_STATE_BOUND_STREAMING, + UDP_SOCKET_STATE_CONNECTED, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub struct udp_socket_streams_t { + pub incoming: u32, + pub incoming_pollable: u32, + pub outgoing: u32, + pub outgoing_pollable: u32, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub struct udp_socket_state_bound_streaming_t { + pub streams: udp_socket_streams_t, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub struct udp_socket_state_connected_t { + pub streams: udp_socket_streams_t, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub union udp_socket_state_value_t { + pub bound_streaming: udp_socket_state_bound_streaming_t, + pub connected: udp_socket_state_connected_t, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub struct udp_socket_state_t { + pub tag: udp_socket_state_tag_t, + pub value: udp_socket_state_value_t, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub struct udp_socket_t { + pub socket: u32, + pub socket_pollable: u32, + pub blocking: bool, + pub family: u8, + pub state: udp_socket_state_t, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub union descriptor_table_entry_value_t { + pub tcp_socket: tcp_socket_t, + pub udp_socket: udp_socket_t, + } + + #[repr(C)] + #[derive(Copy, Clone)] + pub struct descriptor_table_entry_t { + pub tag: descriptor_table_entry_tag_t, + pub value: descriptor_table_entry_value_t, + } + + extern "C" { + pub fn descriptor_table_get_ref( + fd: c_int, + entry: *mut *mut descriptor_table_entry_t, + ) -> bool; + } +}