Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix tls and ws listen poll #333

Merged
merged 2 commits into from
Sep 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ rand = "0.7.1"
futures = { version = "0.3.0" }
tokio = { version = "1.0.0", features = ["time", "io-util", "net", "rt-multi-thread"] }
tokio-util = { version = "0.6.0", features = ["codec"] }
crossbeam-channel = "0.3.6"
crossbeam-channel = "0.5"
env_logger = "0.6.0"
bytes = "1.0.0"
2 changes: 1 addition & 1 deletion tentacle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ socket2 = { version = "0.4.0", optional = true }

[dev-dependencies]
env_logger = "0.6.0"
crossbeam-channel = "0.3.6"
crossbeam-channel = "0.5"
systemstat = "0.1.3"
futures-test = "0.3.5"

Expand Down
35 changes: 20 additions & 15 deletions tentacle/src/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,22 +407,27 @@ mod os {

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match self.get_mut() {
MultiIncoming::Tcp(inner) => match inner.poll_accept(cx)? {
// Why can't get the peer address of the connected stream ?
// Error will be "Transport endpoint is not connected",
// so why incoming will appear unconnected stream ?
Poll::Ready((stream, _)) => match stream.peer_addr() {
Ok(remote_address) => Poll::Ready(Some(Ok((
socketaddr_to_multiaddr(remote_address),
MultiStream::Tcp(stream),
)))),
Err(err) => {
debug!("stream get peer address error: {:?}", err);
Poll::Pending
MultiIncoming::Tcp(inner) => {
loop {
match inner.poll_accept(cx)? {
// Why can't get the peer address of the connected stream ?
// Error will be "Transport endpoint is not connected",
// so why incoming will appear unconnected stream ?
Poll::Ready((stream, _)) => match stream.peer_addr() {
Ok(remote_address) => {
break Poll::Ready(Some(Ok((
socketaddr_to_multiaddr(remote_address),
MultiStream::Tcp(stream),
))))
}
Err(err) => {
debug!("stream get peer address error: {:?}", err);
}
},
Poll::Pending => break Poll::Pending,
}
},
Poll::Pending => Poll::Pending,
},
}
}
MultiIncoming::Memory(inner) => match inner.poll_next_unpin(cx)? {
Poll::Ready(Some((addr, stream))) => {
Poll::Ready(Some(Ok((addr, MultiStream::Memory(stream)))))
Expand Down
85 changes: 49 additions & 36 deletions tentacle/src/transports/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,57 +106,70 @@ impl TlsListener {
}
}

fn poll_pending(
&mut self,
cx: &mut Context,
) -> Poll<Option<std::result::Result<(Multiaddr, TlsStream), io::Error>>> {
fn poll_pending(&mut self, cx: &mut Context) -> Poll<(Multiaddr, TlsStream)> {
match Pin::new(&mut self.pending_stream).as_mut().poll_next(cx) {
Poll::Ready(Some(res)) => Poll::Ready(Some(Ok(res))),
Poll::Ready(Some(res)) => Poll::Ready(res),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
}

fn poll_listen(&mut self, cx: &mut Context) -> Poll<std::result::Result<(), io::Error>> {
match self.inner.poll_accept(cx)? {
Poll::Ready((stream, _)) => {
match stream.peer_addr() {
Ok(remote_address) => {
let timeout = self.timeout;
let mut sender = self.sender.clone();
let acceptor = TlsAcceptor::from(Arc::clone(&self.tls_config));
crate::runtime::spawn(async move {
match crate::runtime::timeout(timeout, acceptor.accept(stream)).await {
Err(_) => warn!("accept tls server stream timeout"),
Ok(res) => match res {
Ok(stream) => {
let mut addr = socketaddr_to_multiaddr(remote_address);
addr.push(Protocol::Tls(Cow::Borrowed("")));
if sender.send((addr, Box::new(stream))).await.is_err() {
warn!("receiver closed unexpectedly")
}
}
Err(err) => {
warn!("accept tls server stream err: {:?}", err);
}
},
}
});
}
Err(err) => {
warn!("stream get peer address error: {:?}", err);
}
}
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
}
}

impl Stream for TlsListener {
type Item = std::result::Result<(Multiaddr, TlsStream), io::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Poll::Ready(res) = self.poll_pending(cx) {
return Poll::Ready(res);
return Poll::Ready(Some(Ok(res)));
}

match self.inner.poll_accept(cx)? {
Poll::Ready((stream, _)) => match stream.peer_addr() {
Ok(remote_address) => {
let timeout = self.timeout;
let mut sender = self.sender.clone();
let acceptor = TlsAcceptor::from(Arc::clone(&self.tls_config));
crate::runtime::spawn(async move {
match crate::runtime::timeout(timeout, acceptor.accept(stream)).await {
Err(_) => warn!("accept tls server stream timeout"),
Ok(res) => match res {
Ok(stream) => {
let mut addr = socketaddr_to_multiaddr(remote_address);
addr.push(Protocol::Tls(Cow::Borrowed("")));
if sender.send((addr, Box::new(stream))).await.is_err() {
warn!("receiver closed unexpectedly")
}
}
Err(err) => {
warn!("accept tls server stream err: {:?}", err);
}
},
}
});
self.poll_pending(cx)
loop {
let is_pending = self.poll_listen(cx)?.is_pending();
match self.poll_pending(cx) {
Poll::Ready(res) => return Poll::Ready(Some(Ok(res))),
Poll::Pending => {
if is_pending {
break;
}
}
Err(err) => {
warn!("stream get peer address error: {:?}", err);
Poll::Pending
}
},
Poll::Pending => Poll::Pending,
}
}
Poll::Pending
}
}

Expand Down
84 changes: 49 additions & 35 deletions tentacle/src/transports/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,56 +158,70 @@ impl WebsocketListener {
}
}

fn poll_pending(
&mut self,
cx: &mut Context,
) -> Poll<Option<std::result::Result<(Multiaddr, WsStream), io::Error>>> {
fn poll_pending(&mut self, cx: &mut Context) -> Poll<(Multiaddr, WsStream)> {
match Pin::new(&mut self.pending_stream).as_mut().poll_next(cx) {
Poll::Ready(Some(res)) => Poll::Ready(Some(Ok(res))),
Poll::Ready(Some(res)) => Poll::Ready(res),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
}

fn poll_listen(&mut self, cx: &mut Context) -> Poll<std::result::Result<(), io::Error>> {
match self.inner.poll_accept(cx)? {
Poll::Ready((stream, _)) => {
match stream.peer_addr() {
Ok(remote_address) => {
let timeout = self.timeout;
let mut sender = self.sender.clone();
crate::runtime::spawn(async move {
match crate::runtime::timeout(timeout, accept_async(stream)).await {
Err(_) => debug!("accept websocket stream timeout"),
Ok(res) => match res {
Ok(stream) => {
let mut addr = socketaddr_to_multiaddr(remote_address);
addr.push(Protocol::Ws);
if sender.send((addr, WsStream::new(stream))).await.is_err()
{
debug!("receiver closed unexpectedly")
}
}
Err(err) => {
debug!("accept websocket stream err: {:?}", err);
}
},
}
});
}
Err(err) => {
debug!("stream get peer address error: {:?}", err);
}
}
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
}
}

impl Stream for WebsocketListener {
type Item = std::result::Result<(Multiaddr, WsStream), io::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Poll::Ready(res) = self.poll_pending(cx) {
return Poll::Ready(res);
return Poll::Ready(Some(Ok(res)));
}

match self.inner.poll_accept(cx)? {
Poll::Ready((stream, _)) => match stream.peer_addr() {
Ok(remote_address) => {
let timeout = self.timeout;
let mut sender = self.sender.clone();
crate::runtime::spawn(async move {
match crate::runtime::timeout(timeout, accept_async(stream)).await {
Err(_) => debug!("accept websocket stream timeout"),
Ok(res) => match res {
Ok(stream) => {
let mut addr = socketaddr_to_multiaddr(remote_address);
addr.push(Protocol::Ws);
if sender.send((addr, WsStream::new(stream))).await.is_err() {
debug!("receiver closed unexpectedly")
}
}
Err(err) => {
debug!("accept websocket stream err: {:?}", err);
}
},
}
});
self.poll_pending(cx)
}
Err(err) => {
debug!("stream get peer address error: {:?}", err);
Poll::Pending
loop {
let is_pending = self.poll_listen(cx)?.is_pending();
match self.poll_pending(cx) {
Poll::Ready(res) => return Poll::Ready(Some(Ok(res))),
Poll::Pending => {
if is_pending {
break;
}
}
},
Poll::Pending => Poll::Pending,
}
}
Poll::Pending
}
}

Expand Down
16 changes: 16 additions & 0 deletions tentacle/tests/certificates/node2-wrong/ca.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE-----
MIICmzCCAkGgAwIBAgIBKjAKBggqhkjOPQQDAjCBhzELMAkGA1UEBgwCQ04xCzAJ
BgNVBAgMAlpKMQswCQYDVQQHDAJIWjENMAsGA1UECgwEQ0lUQTEaMBgGA1UECwwR
QmxvY2tjaGFpbkRldmVsb3AxMzAxBgNVBAMMKjB4MzdkMWM3NDQ5YmZlNzZmZTlj
NDQ1ZTYyNmRhMDYyNjVlOTM3NzYwMTAgFw03NTAxMDEwMDAwMDBaGA80MDk2MDEw
MTAwMDAwMFowgYcxCzAJBgNVBAYMAkNOMQswCQYDVQQIDAJaSjELMAkGA1UEBwwC
SFoxDTALBgNVBAoMBENJVEExGjAYBgNVBAsMEUJsb2NrY2hhaW5EZXZlbG9wMTMw
MQYDVQQDDCoweDM3ZDFjNzQ0OWJmZTc2ZmU5YzQ0NWU2MjZkYTA2MjY1ZTkzNzc2
MDEwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARqckVxxyXetaUkDFmotKVr+ryn
46Ab1HQMce1wavSMTXTfKFNF17yIbb/p+m/bMikDIDRlFdrPzCTItAxp2rvgo4GZ
MIGWMDUGA1UdEQQuMCyCKjB4MzdkMWM3NDQ5YmZlNzZmZTljNDQ1ZTYyNmRhMDYy
NjVlOTM3NzYwMTAdBgNVHSUEFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwHQYDVR0O
BBYEFCDzfo3cfAjZX6IipouRFKxtECeeMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0P
AQH/BAQDAgWgMAoGCCqGSM49BAMCA0gAMEUCIEyBoZBxC0CSAYMDGs9bB8gv2kmq
yTrIjeqz3znDjrj3AiEA5rzZpZtgf0/Vj74V6l0Cv8B7Pa/yQ8z7kf8qNXL8d3o=
-----END CERTIFICATE-----
17 changes: 17 additions & 0 deletions tentacle/tests/certificates/node2-wrong/server.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-----BEGIN CERTIFICATE-----
MIICnTCCAkOgAwIBAgIBKjAKBggqhkjOPQQDAjCBhzELMAkGA1UEBgwCQ04xCzAJ
BgNVBAgMAlpKMQswCQYDVQQHDAJIWjENMAsGA1UECgwEQ0lUQTEaMBgGA1UECwwR
QmxvY2tjaGFpbkRldmVsb3AxMzAxBgNVBAMMKjB4MzdkMWM3NDQ5YmZlNzZmZTlj
NDQ1ZTYyNmRhMDYyNjVlOTM3NzYwMTAgFw03NTAxMDEwMDAwMDBaGA80MDk2MDEw
MTAwMDAwMFowgYcxCzAJBgNVBAYMAkNOMQswCQYDVQQIDAJaSjELMAkGA1UEBwwC
SFoxDTALBgNVBAoMBENJVEExGjAYBgNVBAsMEUJsb2NrY2hhaW5EZXZlbG9wMTMw
MQYDVQQDDCoweDMyOWY1Y2JmYTY3MWZiN2UzYmI2YmM0NTUxMGQ5NWY0YWM2YTZj
YjgwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAASNZNyFgQF+nQPbM9Mhvq0CMNzj
BqWgG/vprroFWsllCfHM79VfYOx1Sn2FwKQWBoU5vNO4XFGnnhqbh6jXURPfo4Gb
MIGYMB8GA1UdIwQYMBaAFCDzfo3cfAjZX6IipouRFKxtECeeMDUGA1UdEQQuMCyC
KjB4MzI5ZjVjYmZhNjcxZmI3ZTNiYjZiYzQ1NTEwZDk1ZjRhYzZhNmNiODAdBgNV
HSUEFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwDwYDVR0TAQH/BAUwAwEBADAOBgNV
HQ8BAf8EBAMCBaAwCgYIKoZIzj0EAwIDSAAwRQIgcsxlB9Hc8uhDt/pogD0kaZ+o
wTdCK+v9AKEOzNrmaQICIQCKf3bs5EwZbucSyr6/udPPXrALnKgo1+oAQQiwffY6
0A==
-----END CERTIFICATE-----
5 changes: 5 additions & 0 deletions tentacle/tests/certificates/node2-wrong/server.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-----BEGIN PRIVATE KEY-----
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgsD0e5mrPUvCcwczo
B4Ot+0nstgq+aJ5Pnhxn//oA9WuhRANCAASNZNyFgQF+nQPbM9Mhvq0CMNzjBqWg
G/vprroFWsllCfHM79VfYOx1Sn2FwKQWBoU5vNO4XFGnnhqbh6jXURPf
-----END PRIVATE KEY-----
Loading