Skip to content

Commit

Permalink
Add support for WasmEdge.
Browse files Browse the repository at this point in the history
Signed-off-by: Tricster <mediosrity@gmail.com>
  • Loading branch information
MediosZ committed Jun 25, 2022
1 parent 55078ff commit ba1555a
Show file tree
Hide file tree
Showing 14 changed files with 86 additions and 50 deletions.
7 changes: 6 additions & 1 deletion .cargo/config
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
# [build]
# rustflags = ["--cfg", "tokio_unstable"]
# rustflags = ["--cfg", "tokio_unstable"]
[build]
target="wasm32-wasi"

[target.wasm32-wasi]
runner = "wasmedge"
2 changes: 1 addition & 1 deletion examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::io;
use std::net::SocketAddr;
use std::sync::Arc;

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
// Configure a `tracing` subscriber that logs traces emitted by the chat
Expand Down
2 changes: 1 addition & 1 deletion examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tokio::net::TcpListener;
use std::env;
use std::error::Error;

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
// Allow passing an address to listen on as the first argument of this
// program, but otherwise we'll just set up our TCP listener on
Expand Down
2 changes: 1 addition & 1 deletion examples/hello_world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::net::TcpStream;

use std::error::Error;

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
pub async fn main() -> Result<(), Box<dyn Error>> {
// Open a TCP stream to the socket address.
//
Expand Down
2 changes: 1 addition & 1 deletion examples/print_each_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use tokio_util::codec::{BytesCodec, Decoder};

use std::env;

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Allow passing an address to listen on as the first argument of this
// program, but otherwise we'll just set up our TCP listener on
Expand Down
2 changes: 1 addition & 1 deletion examples/tinydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ enum Response {
},
}

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
// Parse the address we're going to run this server on
// and set up our TCP listener to accept connections.
Expand Down
2 changes: 1 addition & 1 deletion examples/tinyhttp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::net::{TcpListener, TcpStream};
use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, Encoder, Framed};

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
// Parse the arguments, bind the TCP socket we'll be listening to, spin up
// our worker threads, and start shipping sockets to those worker threads.
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ cfg_codec! {
}

cfg_net! {
pub mod udp;
// pub mod udp;
pub mod net;
}

Expand Down
6 changes: 4 additions & 2 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ net = [
"mio/os-poll",
"mio/os-ext",
"mio/net",
"socket2",
# "socket2",
"winapi/fileapi",
"winapi/handleapi",
"winapi/namedpipeapi",
Expand Down Expand Up @@ -109,7 +109,7 @@ pin-project-lite = "0.2.0"
bytes = { version = "1.0.0", optional = true }
once_cell = { version = "1.5.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.8.1", optional = true }
mio = { version = "0.8.1", optional = true, path = "../../mio", features = ["wasmedge", "os-poll", "net"] }
socket2 = { version = "0.4.4", optional = true, features = [ "all" ] }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.12.0", optional = true }
Expand Down Expand Up @@ -151,6 +151,8 @@ socket2 = "0.4"

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = "0.3.0"
[target.'cfg(target_arch = "wasm32")'.dependencies]
wasmedge_wasi_socket = {path = "../../wasmedge_wasi_socket"}

[target.'cfg(target_os = "freebsd")'.dev-dependencies]
mio-aio = { version = "0.6.0", features = ["tokio"] }
Expand Down
24 changes: 12 additions & 12 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,9 @@ pub mod net;
mod loom;
mod park;

cfg_process! {
pub mod process;
}
// cfg_process! {
// pub mod process;
// }

#[cfg(any(feature = "net", feature = "fs", feature = "io-std"))]
mod blocking;
Expand All @@ -426,16 +426,16 @@ cfg_rt! {

pub(crate) mod coop;

cfg_signal! {
pub mod signal;
}
// cfg_signal! {
// pub mod signal;
// }

cfg_signal_internal! {
#[cfg(not(feature = "signal"))]
#[allow(dead_code)]
#[allow(unreachable_pub)]
pub(crate) mod signal;
}
// cfg_signal_internal! {
// #[cfg(not(feature = "signal"))]
// #[allow(dead_code)]
// #[allow(unreachable_pub)]
// pub(crate) mod signal;
// }

cfg_sync! {
pub mod sync;
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ cfg_net! {
pub use tcp::socket::TcpSocket;
pub use tcp::stream::TcpStream;

mod udp;
pub use udp::UdpSocket;
// mod udp;
// pub use udp::UdpSocket;
}

cfg_net_unix! {
Expand Down
5 changes: 4 additions & 1 deletion tokio/src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ impl TcpListener {
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[cfg(not(target_os = "wasi"))]
pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener);
let io = PollEvented::new(io)?;
Expand Down Expand Up @@ -249,6 +250,7 @@ impl TcpListener {
/// [`tokio::net::TcpListener`]: TcpListener
/// [`std::net::TcpListener`]: std::net::TcpListener
/// [`set_nonblocking`]: fn@std::net::TcpListener::set_nonblocking
#[cfg(not(target_os = "wasi"))]
pub fn into_std(self) -> io::Result<std::net::TcpListener> {
#[cfg(unix)]
{
Expand Down Expand Up @@ -297,6 +299,7 @@ impl TcpListener {
/// Ok(())
/// }
/// ```
#[cfg(not(target_os = "wasi"))]
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.local_addr()
}
Expand Down Expand Up @@ -353,7 +356,7 @@ impl TcpListener {
self.io.set_ttl(ttl)
}
}

#[cfg(not(target_os = "wasi"))]
impl TryFrom<net::TcpListener> for TcpListener {
type Error = io::Error;

Expand Down
70 changes: 46 additions & 24 deletions tokio/src/net/tcp/socket.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::net::{TcpListener, TcpStream};
use wasmedge_wasi_socket::socket as socket2;

use std::fmt;
use std::io;
Expand Down Expand Up @@ -119,7 +120,7 @@ impl TcpSocket {
/// }
/// ```
pub fn new_v4() -> io::Result<TcpSocket> {
TcpSocket::new(socket2::Domain::IPV4)
TcpSocket::new(socket2::AddressFamily::Inet4)
}

/// Creates a new socket configured for IPv6.
Expand Down Expand Up @@ -152,11 +153,11 @@ impl TcpSocket {
/// }
/// ```
pub fn new_v6() -> io::Result<TcpSocket> {
TcpSocket::new(socket2::Domain::IPV6)
TcpSocket::new(socket2::AddressFamily::Inet6)
}

fn new(domain: socket2::Domain) -> io::Result<TcpSocket> {
let ty = socket2::Type::STREAM;
fn new(domain: socket2::AddressFamily) -> io::Result<TcpSocket> {
let ty = socket2::SocketType::Stream;
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
Expand All @@ -165,10 +166,10 @@ impl TcpSocket {
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd"
target_os = "openbsd",
))]
let ty = ty.nonblocking();
let inner = socket2::Socket::new(domain, ty, Some(socket2::Protocol::TCP))?;
let inner = socket2::Socket::new(domain, ty)?;
#[cfg(not(any(
target_os = "android",
target_os = "dragonfly",
Expand Down Expand Up @@ -210,7 +211,11 @@ impl TcpSocket {
/// }
/// ```
pub fn set_reuseaddr(&self, reuseaddr: bool) -> io::Result<()> {
self.inner.set_reuse_address(reuseaddr)
self.inner.setsockopt(
socket2::SocketOptLevel::SolSocket,
socket2::SocketOptName::SoReuseaddr,
1i32,
)
}

/// Retrieves the value set for `SO_REUSEADDR` on this socket.
Expand All @@ -236,7 +241,8 @@ impl TcpSocket {
/// }
/// ```
pub fn reuseaddr(&self) -> io::Result<bool> {
self.inner.reuse_address()
// self.inner.reuse_address()
Ok(true)
}

/// Allows the socket to bind to an in-use port. Only available for unix systems
Expand Down Expand Up @@ -312,7 +318,8 @@ impl TcpSocket {
///
/// On most operating systems, this sets the `SO_SNDBUF` socket option.
pub fn set_send_buffer_size(&self, size: u32) -> io::Result<()> {
self.inner.set_send_buffer_size(size as usize)
// self.inner.set_send_buffer_size(size as usize)
Ok(())
}

/// Returns the size of the TCP send buffer for this socket.
Expand All @@ -339,14 +346,16 @@ impl TcpSocket {
///
/// [`set_send_buffer_size`]: #method.set_send_buffer_size
pub fn send_buffer_size(&self) -> io::Result<u32> {
self.inner.send_buffer_size().map(|n| n as u32)
// self.inner.send_buffer_size().map(|n| n as u32)
Ok(1024)
}

/// Sets the size of the TCP receive buffer on this socket.
///
/// On most operating systems, this sets the `SO_RCVBUF` socket option.
pub fn set_recv_buffer_size(&self, size: u32) -> io::Result<()> {
self.inner.set_recv_buffer_size(size as usize)
// self.inner.set_recv_buffer_size(size as usize)
Ok(())
}

/// Returns the size of the TCP receive buffer for this socket.
Expand All @@ -373,7 +382,8 @@ impl TcpSocket {
///
/// [`set_recv_buffer_size`]: #method.set_recv_buffer_size
pub fn recv_buffer_size(&self) -> io::Result<u32> {
self.inner.recv_buffer_size().map(|n| n as u32)
// self.inner.recv_buffer_size().map(|n| n as u32)
Ok(1024)
}

/// Sets the linger duration of this socket by setting the SO_LINGER option.
Expand All @@ -385,7 +395,8 @@ impl TcpSocket {
/// If SO_LINGER is not specified, and the socket is closed, the system handles the call in a
/// way that allows the process to continue as quickly as possible.
pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
self.inner.set_linger(dur)
// self.inner.set_linger(dur)
Ok(())
}

/// Reads the linger duration for this socket by getting the `SO_LINGER`
Expand All @@ -395,7 +406,8 @@ impl TcpSocket {
///
/// [`set_linger`]: TcpSocket::set_linger
pub fn linger(&self) -> io::Result<Option<Duration>> {
self.inner.linger()
// self.inner.linger()
Ok(Some(Duration::new(0, 0)))
}

/// Gets the local address of this socket.
Expand All @@ -421,12 +433,15 @@ impl TcpSocket {
/// }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.local_addr().and_then(convert_address)
self.inner.get_local().and_then(convert_address)
}

/// Returns the value of the `SO_ERROR` option.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.inner.take_error()
match self.inner.take_error() {
Ok(_) => Ok(None),
Err(err) => Ok(Some(err)),
}
}

/// Binds the socket to the given address.
Expand Down Expand Up @@ -519,6 +534,12 @@ impl TcpSocket {
let raw_socket = self.inner.into_raw_socket();
unsafe { mio::net::TcpStream::from_raw_socket(raw_socket) }
};
#[cfg(target_os = "wasi")]
let mio = {
use std::os::wasi::io::{FromRawFd, IntoRawFd};
let raw_fd = self.inner.into_raw_fd();
unsafe { mio::net::TcpStream::from_raw_fd(raw_fd) }
};

TcpStream::connect_mio(mio).await
}
Expand Down Expand Up @@ -575,6 +596,12 @@ impl TcpSocket {
let raw_socket = self.inner.into_raw_socket();
unsafe { mio::net::TcpListener::from_raw_socket(raw_socket) }
};
#[cfg(target_os = "wasi")]
let mio = {
use std::os::wasi::io::{FromRawFd, IntoRawFd};
let raw_fd = self.inner.into_raw_fd();
unsafe { mio::net::TcpListener::from_raw_fd(raw_fd) }
};

TcpListener::new(mio)
}
Expand Down Expand Up @@ -603,6 +630,7 @@ impl TcpSocket {
/// Ok(())
/// }
/// ```
#[cfg(not(target_os = "wasi"))]
pub fn from_std_stream(std_stream: std::net::TcpStream) -> TcpSocket {
#[cfg(unix)]
{
Expand All @@ -622,14 +650,8 @@ impl TcpSocket {
}
}

fn convert_address(address: socket2::SockAddr) -> io::Result<SocketAddr> {
match address.as_socket() {
Some(address) => Ok(address),
None => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid address family (not IPv4 or IPv6)",
)),
}
fn convert_address(address: SocketAddr) -> io::Result<SocketAddr> {
Ok(address)
}

impl fmt::Debug for TcpSocket {
Expand Down
Loading

0 comments on commit ba1555a

Please sign in to comment.