Skip to content

Commit

Permalink
fix: fix windows compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Oct 26, 2020
1 parent 2f4878a commit c18e4c9
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 9 deletions.
48 changes: 39 additions & 9 deletions tentacle/src/runtime/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ pub use os::*;
mod os {
use async_io::Async;
use async_std::net::{TcpListener as AsyncListener, TcpStream as AsyncStream, ToSocketAddrs};
use futures::{AsyncRead as _, Future};
use futures::{
channel::{
mpsc::{channel, Receiver},
oneshot::{self, Sender},
},
future::select,
AsyncRead as _, FutureExt, SinkExt, StreamExt,
};
use std::{
pin::Pin,
task::{Context, Poll},
Expand All @@ -24,25 +31,47 @@ mod os {
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};

#[derive(Debug)]
pub struct TcpListener(AsyncListener);
pub struct TcpListener(
Receiver<io::Result<(AsyncStream, SocketAddr)>>,
SocketAddr,
Sender<()>,
);

impl TcpListener {
fn new(listener: AsyncListener, local_addr: SocketAddr) -> TcpListener {
let (mut tx, rx) = channel(24);
let (tx_c, rx_c) = oneshot::channel::<()>();
let task = async move {
loop {
let res = listener.accept().await;
let _ignore = tx.send(res).await;
}
}
.boxed();
crate::runtime::spawn(select(task, rx_c));
TcpListener(rx, local_addr, tx_c)
}

pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> {
Ok(TcpListener(AsyncListener::bind(addrs).await?))
let listener = AsyncListener::bind(addrs).await?;
let local_addr = listener.local_addr()?;
Ok(Self::new(listener, local_addr))
}

pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.0.local_addr()
Ok(self.1)
}

pub fn poll_accept(
&mut self,
cx: &mut Context,
) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
unsafe {
Pin::new_unchecked(&mut self.0.accept())
.poll(cx)
.map_ok(|x| (TcpStream(x.0.compat_write()), x.1))
match self.0.poll_next_unpin(cx) {
Poll::Ready(Some(res)) => {
Poll::Ready(res.map(|x| (TcpStream(x.0.compat_write()), x.1)))
}
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
}
}
}
Expand Down Expand Up @@ -97,7 +126,8 @@ mod os {
};

pub fn from_std(listen: StdListen) -> io::Result<TcpListener> {
Ok(TcpListener(AsyncListener::from(listen)))
let addr = listen.local_addr()?;
Ok(TcpListener::new(AsyncListener::from(listen), addr))
}

pub async fn connect_std(std_tcp: Socket, addr: &SocketAddr) -> io::Result<TcpStream> {
Expand Down
12 changes: 12 additions & 0 deletions tentacle/src/runtime/tokio_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,17 @@ pub fn from_std(listen: StdListen) -> io::Result<TcpListener> {
}

pub async fn connect_std(stream: Socket, addr: &SocketAddr) -> io::Result<TcpStream> {
// on windows, if not set reuse address, but use socket2 and tokio `connect_std` function
// will cause a error "Os {code: 10022, kind: InvalidInput, message: "An invalid parameter was provided." }"
// but if set, nothing happened, this is confusing behavior
// issue: https://github.com/tokio-rs/tokio/issues/3030
#[cfg(windows)]
if stream.reuse_address()? {
TcpStream::connect_std(stream.into_tcp_stream(), addr).await
} else {
TcpStream::connect(addr).await
}

#[cfg(unix)]
TcpStream::connect_std(stream.into_tcp_stream(), addr).await
}

0 comments on commit c18e4c9

Please sign in to comment.