Skip to content

Commit

Permalink
Merge pull request #333 from nervosnetwork/fix-ws-tls-listen-poll
Browse files Browse the repository at this point in the history
fix: fix tls and ws listen poll
  • Loading branch information
driftluo authored Sep 26, 2021
2 parents 2efa2e5 + 3bc0036 commit e46e6aa
Show file tree
Hide file tree
Showing 11 changed files with 572 additions and 109 deletions.
2 changes: 1 addition & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ rand = "0.7.1"
futures = { version = "0.3.0" }
tokio = { version = "1.0.0", features = ["time", "io-util", "net", "rt-multi-thread"] }
tokio-util = { version = "0.6.0", features = ["codec"] }
crossbeam-channel = "0.3.6"
crossbeam-channel = "0.5"
env_logger = "0.6.0"
bytes = "1.0.0"
2 changes: 1 addition & 1 deletion tentacle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ socket2 = { version = "0.4.0", optional = true }

[dev-dependencies]
env_logger = "0.6.0"
crossbeam-channel = "0.3.6"
crossbeam-channel = "0.5"
systemstat = "0.1.3"
futures-test = "0.3.5"

Expand Down
4 changes: 2 additions & 2 deletions tentacle/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub enum TargetProtocol {
/// Try open one protocol
Single(ProtocolId),
/// Try open some protocol, if return true, open it
Filter(Box<dyn Fn(&ProtocolId) -> bool + Send>),
Filter(Box<dyn Fn(&ProtocolId) -> bool + Sync + Send + 'static>),
}

impl From<ProtocolId> for TargetProtocol {
Expand All @@ -143,7 +143,7 @@ pub enum TargetSession {
/// Try send to only one
Single(SessionId),
/// Try send to some session, if return true, send to it
Filter(Box<dyn Fn(&SessionId) -> bool + Send>),
Filter(Box<dyn Fn(&SessionId) -> bool + Sync + Send + 'static>),
}

impl From<SessionId> for TargetSession {
Expand Down
35 changes: 20 additions & 15 deletions tentacle/src/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,22 +407,27 @@ mod os {

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match self.get_mut() {
MultiIncoming::Tcp(inner) => match inner.poll_accept(cx)? {
// Why can't get the peer address of the connected stream ?
// Error will be "Transport endpoint is not connected",
// so why incoming will appear unconnected stream ?
Poll::Ready((stream, _)) => match stream.peer_addr() {
Ok(remote_address) => Poll::Ready(Some(Ok((
socketaddr_to_multiaddr(remote_address),
MultiStream::Tcp(stream),
)))),
Err(err) => {
debug!("stream get peer address error: {:?}", err);
Poll::Pending
MultiIncoming::Tcp(inner) => {
loop {
match inner.poll_accept(cx)? {
// Why can't get the peer address of the connected stream ?
// Error will be "Transport endpoint is not connected",
// so why incoming will appear unconnected stream ?
Poll::Ready((stream, _)) => match stream.peer_addr() {
Ok(remote_address) => {
break Poll::Ready(Some(Ok((
socketaddr_to_multiaddr(remote_address),
MultiStream::Tcp(stream),
))))
}
Err(err) => {
debug!("stream get peer address error: {:?}", err);
}
},
Poll::Pending => break Poll::Pending,
}
},
Poll::Pending => Poll::Pending,
},
}
}
MultiIncoming::Memory(inner) => match inner.poll_next_unpin(cx)? {
Poll::Ready(Some((addr, stream))) => {
Poll::Ready(Some(Ok((addr, MultiStream::Memory(stream)))))
Expand Down
85 changes: 49 additions & 36 deletions tentacle/src/transports/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,57 +106,70 @@ impl TlsListener {
}
}

fn poll_pending(
&mut self,
cx: &mut Context,
) -> Poll<Option<std::result::Result<(Multiaddr, TlsStream), io::Error>>> {
fn poll_pending(&mut self, cx: &mut Context) -> Poll<(Multiaddr, TlsStream)> {
match Pin::new(&mut self.pending_stream).as_mut().poll_next(cx) {
Poll::Ready(Some(res)) => Poll::Ready(Some(Ok(res))),
Poll::Ready(Some(res)) => Poll::Ready(res),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
}

fn poll_listen(&mut self, cx: &mut Context) -> Poll<std::result::Result<(), io::Error>> {
match self.inner.poll_accept(cx)? {
Poll::Ready((stream, _)) => {
match stream.peer_addr() {
Ok(remote_address) => {
let timeout = self.timeout;
let mut sender = self.sender.clone();
let acceptor = TlsAcceptor::from(Arc::clone(&self.tls_config));
crate::runtime::spawn(async move {
match crate::runtime::timeout(timeout, acceptor.accept(stream)).await {
Err(_) => warn!("accept tls server stream timeout"),
Ok(res) => match res {
Ok(stream) => {
let mut addr = socketaddr_to_multiaddr(remote_address);
addr.push(Protocol::Tls(Cow::Borrowed("")));
if sender.send((addr, Box::new(stream))).await.is_err() {
warn!("receiver closed unexpectedly")
}
}
Err(err) => {
warn!("accept tls server stream err: {:?}", err);
}
},
}
});
}
Err(err) => {
warn!("stream get peer address error: {:?}", err);
}
}
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
}
}

impl Stream for TlsListener {
type Item = std::result::Result<(Multiaddr, TlsStream), io::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Poll::Ready(res) = self.poll_pending(cx) {
return Poll::Ready(res);
return Poll::Ready(Some(Ok(res)));
}

match self.inner.poll_accept(cx)? {
Poll::Ready((stream, _)) => match stream.peer_addr() {
Ok(remote_address) => {
let timeout = self.timeout;
let mut sender = self.sender.clone();
let acceptor = TlsAcceptor::from(Arc::clone(&self.tls_config));
crate::runtime::spawn(async move {
match crate::runtime::timeout(timeout, acceptor.accept(stream)).await {
Err(_) => warn!("accept tls server stream timeout"),
Ok(res) => match res {
Ok(stream) => {
let mut addr = socketaddr_to_multiaddr(remote_address);
addr.push(Protocol::Tls(Cow::Borrowed("")));
if sender.send((addr, Box::new(stream))).await.is_err() {
warn!("receiver closed unexpectedly")
}
}
Err(err) => {
warn!("accept tls server stream err: {:?}", err);
}
},
}
});
self.poll_pending(cx)
loop {
let is_pending = self.poll_listen(cx)?.is_pending();
match self.poll_pending(cx) {
Poll::Ready(res) => return Poll::Ready(Some(Ok(res))),
Poll::Pending => {
if is_pending {
break;
}
}
Err(err) => {
warn!("stream get peer address error: {:?}", err);
Poll::Pending
}
},
Poll::Pending => Poll::Pending,
}
}
Poll::Pending
}
}

Expand Down
84 changes: 49 additions & 35 deletions tentacle/src/transports/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,56 +158,70 @@ impl WebsocketListener {
}
}

fn poll_pending(
&mut self,
cx: &mut Context,
) -> Poll<Option<std::result::Result<(Multiaddr, WsStream), io::Error>>> {
fn poll_pending(&mut self, cx: &mut Context) -> Poll<(Multiaddr, WsStream)> {
match Pin::new(&mut self.pending_stream).as_mut().poll_next(cx) {
Poll::Ready(Some(res)) => Poll::Ready(Some(Ok(res))),
Poll::Ready(Some(res)) => Poll::Ready(res),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
}

fn poll_listen(&mut self, cx: &mut Context) -> Poll<std::result::Result<(), io::Error>> {
match self.inner.poll_accept(cx)? {
Poll::Ready((stream, _)) => {
match stream.peer_addr() {
Ok(remote_address) => {
let timeout = self.timeout;
let mut sender = self.sender.clone();
crate::runtime::spawn(async move {
match crate::runtime::timeout(timeout, accept_async(stream)).await {
Err(_) => debug!("accept websocket stream timeout"),
Ok(res) => match res {
Ok(stream) => {
let mut addr = socketaddr_to_multiaddr(remote_address);
addr.push(Protocol::Ws);
if sender.send((addr, WsStream::new(stream))).await.is_err()
{
debug!("receiver closed unexpectedly")
}
}
Err(err) => {
debug!("accept websocket stream err: {:?}", err);
}
},
}
});
}
Err(err) => {
debug!("stream get peer address error: {:?}", err);
}
}
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
}
}

impl Stream for WebsocketListener {
type Item = std::result::Result<(Multiaddr, WsStream), io::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Poll::Ready(res) = self.poll_pending(cx) {
return Poll::Ready(res);
return Poll::Ready(Some(Ok(res)));
}

match self.inner.poll_accept(cx)? {
Poll::Ready((stream, _)) => match stream.peer_addr() {
Ok(remote_address) => {
let timeout = self.timeout;
let mut sender = self.sender.clone();
crate::runtime::spawn(async move {
match crate::runtime::timeout(timeout, accept_async(stream)).await {
Err(_) => debug!("accept websocket stream timeout"),
Ok(res) => match res {
Ok(stream) => {
let mut addr = socketaddr_to_multiaddr(remote_address);
addr.push(Protocol::Ws);
if sender.send((addr, WsStream::new(stream))).await.is_err() {
debug!("receiver closed unexpectedly")
}
}
Err(err) => {
debug!("accept websocket stream err: {:?}", err);
}
},
}
});
self.poll_pending(cx)
}
Err(err) => {
debug!("stream get peer address error: {:?}", err);
Poll::Pending
loop {
let is_pending = self.poll_listen(cx)?.is_pending();
match self.poll_pending(cx) {
Poll::Ready(res) => return Poll::Ready(Some(Ok(res))),
Poll::Pending => {
if is_pending {
break;
}
}
},
Poll::Pending => Poll::Pending,
}
}
Poll::Pending
}
}

Expand Down
16 changes: 16 additions & 0 deletions tentacle/tests/certificates/node2-wrong/ca.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE-----
MIICmzCCAkGgAwIBAgIBKjAKBggqhkjOPQQDAjCBhzELMAkGA1UEBgwCQ04xCzAJ
BgNVBAgMAlpKMQswCQYDVQQHDAJIWjENMAsGA1UECgwEQ0lUQTEaMBgGA1UECwwR
QmxvY2tjaGFpbkRldmVsb3AxMzAxBgNVBAMMKjB4MzdkMWM3NDQ5YmZlNzZmZTlj
NDQ1ZTYyNmRhMDYyNjVlOTM3NzYwMTAgFw03NTAxMDEwMDAwMDBaGA80MDk2MDEw
MTAwMDAwMFowgYcxCzAJBgNVBAYMAkNOMQswCQYDVQQIDAJaSjELMAkGA1UEBwwC
SFoxDTALBgNVBAoMBENJVEExGjAYBgNVBAsMEUJsb2NrY2hhaW5EZXZlbG9wMTMw
MQYDVQQDDCoweDM3ZDFjNzQ0OWJmZTc2ZmU5YzQ0NWU2MjZkYTA2MjY1ZTkzNzc2
MDEwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARqckVxxyXetaUkDFmotKVr+ryn
46Ab1HQMce1wavSMTXTfKFNF17yIbb/p+m/bMikDIDRlFdrPzCTItAxp2rvgo4GZ
MIGWMDUGA1UdEQQuMCyCKjB4MzdkMWM3NDQ5YmZlNzZmZTljNDQ1ZTYyNmRhMDYy
NjVlOTM3NzYwMTAdBgNVHSUEFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwHQYDVR0O
BBYEFCDzfo3cfAjZX6IipouRFKxtECeeMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0P
AQH/BAQDAgWgMAoGCCqGSM49BAMCA0gAMEUCIEyBoZBxC0CSAYMDGs9bB8gv2kmq
yTrIjeqz3znDjrj3AiEA5rzZpZtgf0/Vj74V6l0Cv8B7Pa/yQ8z7kf8qNXL8d3o=
-----END CERTIFICATE-----
17 changes: 17 additions & 0 deletions tentacle/tests/certificates/node2-wrong/server.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-----BEGIN CERTIFICATE-----
MIICnTCCAkOgAwIBAgIBKjAKBggqhkjOPQQDAjCBhzELMAkGA1UEBgwCQ04xCzAJ
BgNVBAgMAlpKMQswCQYDVQQHDAJIWjENMAsGA1UECgwEQ0lUQTEaMBgGA1UECwwR
QmxvY2tjaGFpbkRldmVsb3AxMzAxBgNVBAMMKjB4MzdkMWM3NDQ5YmZlNzZmZTlj
NDQ1ZTYyNmRhMDYyNjVlOTM3NzYwMTAgFw03NTAxMDEwMDAwMDBaGA80MDk2MDEw
MTAwMDAwMFowgYcxCzAJBgNVBAYMAkNOMQswCQYDVQQIDAJaSjELMAkGA1UEBwwC
SFoxDTALBgNVBAoMBENJVEExGjAYBgNVBAsMEUJsb2NrY2hhaW5EZXZlbG9wMTMw
MQYDVQQDDCoweDMyOWY1Y2JmYTY3MWZiN2UzYmI2YmM0NTUxMGQ5NWY0YWM2YTZj
YjgwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAASNZNyFgQF+nQPbM9Mhvq0CMNzj
BqWgG/vprroFWsllCfHM79VfYOx1Sn2FwKQWBoU5vNO4XFGnnhqbh6jXURPfo4Gb
MIGYMB8GA1UdIwQYMBaAFCDzfo3cfAjZX6IipouRFKxtECeeMDUGA1UdEQQuMCyC
KjB4MzI5ZjVjYmZhNjcxZmI3ZTNiYjZiYzQ1NTEwZDk1ZjRhYzZhNmNiODAdBgNV
HSUEFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwDwYDVR0TAQH/BAUwAwEBADAOBgNV
HQ8BAf8EBAMCBaAwCgYIKoZIzj0EAwIDSAAwRQIgcsxlB9Hc8uhDt/pogD0kaZ+o
wTdCK+v9AKEOzNrmaQICIQCKf3bs5EwZbucSyr6/udPPXrALnKgo1+oAQQiwffY6
0A==
-----END CERTIFICATE-----
5 changes: 5 additions & 0 deletions tentacle/tests/certificates/node2-wrong/server.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-----BEGIN PRIVATE KEY-----
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgsD0e5mrPUvCcwczo
B4Ot+0nstgq+aJ5Pnhxn//oA9WuhRANCAASNZNyFgQF+nQPbM9Mhvq0CMNzjBqWg
G/vprroFWsllCfHM79VfYOx1Sn2FwKQWBoU5vNO4XFGnnhqbh6jXURPf
-----END PRIVATE KEY-----
38 changes: 19 additions & 19 deletions tentacle/tests/test_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@ fn test(secio: bool, shutdown: bool) {

let (addr_sender, addr_receiver) = channel::<Multiaddr>();

let handle = thread::spawn(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let rt = tokio::runtime::Runtime::new().unwrap();
let async_handle = rt.handle().clone();

let handle = thread::spawn(move || {
async_handle.block_on(async move {
let listen_addr = service_1
.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.await
Expand All @@ -151,32 +153,30 @@ fn test(secio: bool, shutdown: bool) {
let listen_addr_3 = listen_addr.clone();
let listen_addr_4 = listen_addr.clone();

start_service(service_2, listen_addr_2);
start_service(service_3, listen_addr_3);
start_service(service_4, listen_addr_4);
start_service(service_5, listen_addr);
let async_handle = rt.handle();
start_service(service_2, listen_addr_2, async_handle);
start_service(service_3, listen_addr_3, async_handle);
start_service(service_4, listen_addr_4, async_handle);
start_service(service_5, listen_addr, async_handle);

handle.join().expect("test fail");
}

fn start_service<F>(
mut service: Service<F>,
listen_addr: Multiaddr,
) -> ::std::thread::JoinHandle<()>
where
handle: &tokio::runtime::Handle,
) where
F: ServiceHandle + Unpin + Send + 'static,
{
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
service
.dial(listen_addr, TargetProtocol::All)
.await
.unwrap();
handle.spawn(async move {
service
.dial(listen_addr, TargetProtocol::All)
.await
.unwrap();

service.run().await
});
})
service.run().await;
});
}

#[test]
Expand Down
Loading

0 comments on commit e46e6aa

Please sign in to comment.