Skip to content

Commit

Permalink
Merge pull request #275 from nervosnetwork/fix-win-compatible
Browse files Browse the repository at this point in the history
fix: Fix win compatible
  • Loading branch information
driftluo authored Nov 2, 2020
2 parents 2766462 + 7926cbc commit 9cf837a
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 15 deletions.
63 changes: 54 additions & 9 deletions tentacle/src/runtime/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ pub use os::*;
mod os {
use async_io::Async;
use async_std::net::{TcpListener as AsyncListener, TcpStream as AsyncStream, ToSocketAddrs};
use futures::{AsyncRead as _, Future};
use futures::{
channel::{
mpsc::{channel, Receiver},
oneshot::{self, Sender},
},
future::select,
AsyncRead as _, FutureExt, SinkExt, StreamExt,
};
use std::{
pin::Pin,
task::{Context, Poll},
Expand All @@ -24,25 +31,62 @@ mod os {
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};

#[derive(Debug)]
pub struct TcpListener(AsyncListener);
pub struct TcpListener {
/// Why does this need to be handled here?
///
/// https://www.driftluo.com/article/9e85ea7c-219a-4b25-ab32-e66c5d3027d0
///
/// Not only because of idempotent operation reasons, at the same time,
/// after the task is registered to the event monitor, the relationship between
/// the task and the corresponding waker needs to be ensured. If the task is dropped
/// immediately after registration, the waker cannot wake up the corresponding task.
///
/// Since the async-std api was designed without leaving the corresponding poll interface,
/// this will force users to ensure that they are used in an async environment
recv: Receiver<io::Result<(AsyncStream, SocketAddr)>>,
local_addr: SocketAddr,
close_sender: Sender<()>,
}

impl TcpListener {
fn new(listener: AsyncListener, local_addr: SocketAddr) -> TcpListener {
let (mut tx, rx) = channel(24);
let (tx_c, rx_c) = oneshot::channel::<()>();
let task = async move {
loop {
let res = listener.accept().await;
let _ignore = tx.send(res).await;
}
}
.boxed();
crate::runtime::spawn(select(task, rx_c));
TcpListener {
recv: rx,
local_addr,
close_sender: tx_c,
}
}

pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> {
Ok(TcpListener(AsyncListener::bind(addrs).await?))
let listener = AsyncListener::bind(addrs).await?;
let local_addr = listener.local_addr()?;
Ok(Self::new(listener, local_addr))
}

pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.0.local_addr()
Ok(self.local_addr)
}

pub fn poll_accept(
&mut self,
cx: &mut Context,
) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
unsafe {
Pin::new_unchecked(&mut self.0.accept())
.poll(cx)
.map_ok(|x| (TcpStream(x.0.compat_write()), x.1))
match self.recv.poll_next_unpin(cx) {
Poll::Ready(Some(res)) => {
Poll::Ready(res.map(|x| (TcpStream(x.0.compat_write()), x.1)))
}
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
}
}
}
Expand Down Expand Up @@ -97,7 +141,8 @@ mod os {
};

pub fn from_std(listen: StdListen) -> io::Result<TcpListener> {
Ok(TcpListener(AsyncListener::from(listen)))
let addr = listen.local_addr()?;
Ok(TcpListener::new(AsyncListener::from(listen), addr))
}

pub async fn connect_std(std_tcp: Socket, addr: &SocketAddr) -> io::Result<TcpStream> {
Expand Down
12 changes: 7 additions & 5 deletions tentacle/src/runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
#![allow(dead_code)]

#[cfg(feature = "async-runtime")]
#[cfg(all(not(target_arch = "wasm32"), feature = "async-runtime"))]
mod async_runtime;
#[cfg(any(feature = "generic-timer", target_arch = "wasm32"))]
#[cfg(any(
feature = "generic-timer",
all(target_arch = "wasm32", feature = "wasm-timer")
))]
mod generic_timer;
#[cfg(all(not(target_arch = "wasm32"), feature = "tokio-runtime"))]
mod tokio_runtime;
#[cfg(target_arch = "wasm32")]
mod wasm_runtime;

#[cfg(feature = "async-runtime")]
#[cfg(all(not(target_arch = "wasm32"), feature = "async-runtime"))]
pub use async_runtime::*;
#[cfg(any(
feature = "generic-timer",
target_arch = "wasm32",
feature = "wasm-timers"
all(target_arch = "wasm32", feature = "wasm-timer")
))]
pub use generic_timer::*;
#[cfg(all(not(target_arch = "wasm32"), feature = "tokio-runtime"))]
Expand Down
12 changes: 12 additions & 0 deletions tentacle/src/runtime/tokio_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,17 @@ pub fn from_std(listen: StdListen) -> io::Result<TcpListener> {
}

pub async fn connect_std(stream: Socket, addr: &SocketAddr) -> io::Result<TcpStream> {
// on windows, if not set reuse address, but use socket2 and tokio `connect_std` function
// will cause a error "Os {code: 10022, kind: InvalidInput, message: "An invalid parameter was provided." }"
// but if set, nothing happened, this is confusing behavior
// issue: https://github.com/tokio-rs/tokio/issues/3030
#[cfg(windows)]
if stream.reuse_address()? {
TcpStream::connect_std(stream.into_tcp_stream(), addr).await
} else {
TcpStream::connect(addr).await
}

#[cfg(unix)]
TcpStream::connect_std(stream.into_tcp_stream(), addr).await
}
2 changes: 1 addition & 1 deletion tentacle/src/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
mod browser;
#[cfg(not(target_arch = "wasm32"))]
mod tcp;
#[cfg(all(feature = "ws", not(target_os = "unknown")))]
#[cfg(all(feature = "ws", not(target_arch = "wasm32")))]
mod ws;

#[cfg(target_arch = "wasm32")]
Expand Down

0 comments on commit 9cf837a

Please sign in to comment.