From 31ae533c917755d7ef51e0b0eece8ff2651d1dd6 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 30 Jun 2021 10:49:27 +0200 Subject: [PATCH 01/34] feat(ws client): support redirections --- ws-client/src/transport.rs | 85 ++++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 291d530283..ba51f3b80c 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -24,6 +24,7 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::stream::EitherStream; use crate::tokio::{TcpStream, TlsStream}; use futures::io::{BufReader, BufWriter}; use futures::prelude::*; @@ -194,44 +195,30 @@ impl<'a> WsTransportClientBuilder<'a> { sockaddr: SocketAddr, tls_connector: &Option, ) -> Result<(Sender, Receiver), WsHandshakeError> { - // Try establish the TCP connection. - let tcp_stream = { - let socket = TcpStream::connect(sockaddr); - let timeout = crate::tokio::sleep(self.timeout); - futures::pin_mut!(socket, timeout); - match future::select(socket, timeout).await { - future::Either::Left((socket, _)) => { - let socket = socket?; - if let Err(err) = socket.set_nodelay(true) { - log::warn!("set nodelay failed: {:?}", err); - } - match tls_connector { - None => TlsOrPlain::Plain(socket), - Some(connector) => { - let dns_name = crate::tokio::DNSNameRef::try_from_ascii_str(&self.target.host)?; - let tls_stream = connector.connect(dns_name, socket).await?; - TlsOrPlain::Tls(tls_stream) - } - } - } - future::Either::Right((_, _)) => return Err(WsHandshakeError::Timeout(self.timeout)), - } - }; + let mut path = self.target.path.clone(); - let mut client = - WsRawClient::new(BufReader::new(BufWriter::new(tcp_stream)), &self.target.host_header, &self.target.path); - if let Some(origin) = self.origin_header.as_ref() { - client.set_origin(origin); - } + let client = loop { + // Try establish the TCP connection. + let tcp_stream = connect(sockaddr, self.timeout, &self.target.host, tls_connector).await?; - // Perform the initial handshake. - match client.handshake().await? { - ServerResponse::Accepted { .. } => {} - ServerResponse::Rejected { status_code } | ServerResponse::Redirect { status_code, .. } => { - // TODO: HTTP redirects also lead here #339. - return Err(WsHandshakeError::Rejected { status_code }); + let mut client = + WsRawClient::new(BufReader::new(BufWriter::new(tcp_stream)), &self.target.host_header, &path); + if let Some(origin) = self.origin_header.as_ref() { + client.set_origin(origin); } - } + // Perform the initial handshake. + match client.handshake().await? { + ServerResponse::Accepted { .. } => break Ok(client), + ServerResponse::Rejected { status_code } => { + break Err(WsHandshakeError::Rejected { status_code }); + } + ServerResponse::Redirect { status_code, location } => { + log::trace!("recv redirection: status_code: {}, location: {}", status_code, location); + log::debug!("trying to reconnect to redirection: {}", location); + path = location; + } + }; + }?; // If the handshake succeeded, return. let mut builder = client.into_builder(); @@ -241,6 +228,34 @@ impl<'a> WsTransportClientBuilder<'a> { } } +async fn connect( + sockaddr: SocketAddr, + timeout_dur: Duration, + host: &str, + tls_connector: &Option, +) -> Result>, WsHandshakeError> { + let socket = TcpStream::connect(sockaddr); + let timeout = crate::tokio::sleep(timeout_dur); + futures::pin_mut!(socket, timeout); + Ok(match future::select(socket, timeout).await { + future::Either::Left((socket, _)) => { + let socket = socket?; + if let Err(err) = socket.set_nodelay(true) { + log::warn!("set nodelay failed: {:?}", err); + } + match tls_connector { + None => TlsOrPlain::Plain(socket), + Some(connector) => { + let dns_name = crate::tokio::DNSNameRef::try_from_ascii_str(host)?; + let tls_stream = connector.connect(dns_name, socket).await?; + TlsOrPlain::Tls(tls_stream) + } + } + } + future::Either::Right((_, _)) => return Err(WsHandshakeError::Timeout(timeout_dur)), + }) +} + impl From for WsHandshakeError { fn from(err: io::Error) -> WsHandshakeError { WsHandshakeError::Io(err) From 753699c59679caf188a4c96cbd2ffe0f00268e12 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 30 Jun 2021 12:03:04 +0200 Subject: [PATCH 02/34] reuse socket --- ws-client/src/transport.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index ba51f3b80c..bfd1a55ae9 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -196,13 +196,16 @@ impl<'a> WsTransportClientBuilder<'a> { tls_connector: &Option, ) -> Result<(Sender, Receiver), WsHandshakeError> { let mut path = self.target.path.clone(); + // Try establish the TCP connection. - let client = loop { - // Try establish the TCP connection. + let mut socket = { let tcp_stream = connect(sockaddr, self.timeout, &self.target.host, tls_connector).await?; + Some(BufReader::new(BufWriter::new(tcp_stream))) + }; - let mut client = - WsRawClient::new(BufReader::new(BufWriter::new(tcp_stream)), &self.target.host_header, &path); + let client = loop { + let sock = socket.take(); + let mut client = WsRawClient::new(sock.expect("is Some; qed"), &self.target.host_header, &path); if let Some(origin) = self.origin_header.as_ref() { client.set_origin(origin); } @@ -213,6 +216,7 @@ impl<'a> WsTransportClientBuilder<'a> { break Err(WsHandshakeError::Rejected { status_code }); } ServerResponse::Redirect { status_code, location } => { + socket = Some(client.into_inner()); log::trace!("recv redirection: status_code: {}, location: {}", status_code, location); log::debug!("trying to reconnect to redirection: {}", location); path = location; From b6f09da4b3cbde32fe103b3d19e1c3ce1a2630e8 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 30 Jun 2021 12:06:07 +0200 Subject: [PATCH 03/34] reuse socket --- ws-client/src/transport.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index bfd1a55ae9..211dd501ec 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -196,8 +196,8 @@ impl<'a> WsTransportClientBuilder<'a> { tls_connector: &Option, ) -> Result<(Sender, Receiver), WsHandshakeError> { let mut path = self.target.path.clone(); - // Try establish the TCP connection. + // NOTE(niklasad1): this in an `Option` to be able to reuse to tcp_stream. let mut socket = { let tcp_stream = connect(sockaddr, self.timeout, &self.target.host, tls_connector).await?; Some(BufReader::new(BufWriter::new(tcp_stream))) From ce77bf5864ef08dd68cf3303c9baa6f92b9b4a44 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 30 Jun 2021 13:09:26 +0200 Subject: [PATCH 04/34] add hacks --- ws-client/src/transport.rs | 47 +++++++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 211dd501ec..93d1dc58b7 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -192,20 +192,17 @@ impl<'a> WsTransportClientBuilder<'a> { async fn try_connect( &self, - sockaddr: SocketAddr, + mut sockaddr: SocketAddr, tls_connector: &Option, ) -> Result<(Sender, Receiver), WsHandshakeError> { let mut path = self.target.path.clone(); - - // NOTE(niklasad1): this in an `Option` to be able to reuse to tcp_stream. - let mut socket = { - let tcp_stream = connect(sockaddr, self.timeout, &self.target.host, tls_connector).await?; - Some(BufReader::new(BufWriter::new(tcp_stream))) - }; + let mut host = self.target.host.clone(); + let mut host_header = self.target.host_header.clone(); + let mut tls_connector = tls_connector.clone(); let client = loop { - let sock = socket.take(); - let mut client = WsRawClient::new(sock.expect("is Some; qed"), &self.target.host_header, &path); + let tcp_stream = connect(sockaddr, self.timeout, &host, tls_connector).await?; + let mut client = WsRawClient::new(BufReader::new(BufWriter::new(tcp_stream)), &host_header, &path); if let Some(origin) = self.origin_header.as_ref() { client.set_origin(origin); } @@ -216,10 +213,38 @@ impl<'a> WsTransportClientBuilder<'a> { break Err(WsHandshakeError::Rejected { status_code }); } ServerResponse::Redirect { status_code, location } => { - socket = Some(client.into_inner()); log::trace!("recv redirection: status_code: {}, location: {}", status_code, location); log::debug!("trying to reconnect to redirection: {}", location); - path = location; + match url::Url::parse(&location) { + // absolute URL => need to lookup sockaddr. + // TODO: this is hacky, we need to actually test all sockaddrs + Ok(url) => { + let target = Target::parse(url)?; + // TODO. + sockaddr = target.sockaddrs[0]; + path = target.path; + host = target.host; + host_header = target.host_header; + tls_connector = match target.mode { + Mode::Tls => { + let mut client_config = rustls::ClientConfig::default(); + if let CertificateStore::Native = self.certificate_store { + client_config.root_store = rustls_native_certs::load_native_certs() + .map_err(|(_, e)| WsHandshakeError::CertificateStore(e))?; + } + Some(Arc::new(client_config).into()) + } + Mode::Plain => None, + }; + } + // URL is relative, just set it as location. + Err( + url::ParseError::RelativeUrlWithCannotBeABaseBase | url::ParseError::RelativeUrlWithoutBase, + ) => { + path = location; + } + Err(e) => return Err(WsHandshakeError::Url(format!("Invalid URL: {}", e).into())), + }; } }; }?; From 67b550ee46517dc9e13cec509776f9a2c71a4e40 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 30 Jun 2021 13:42:50 +0200 Subject: [PATCH 05/34] fix build --- ws-client/src/transport.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 93d1dc58b7..ce3705bbde 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -201,7 +201,7 @@ impl<'a> WsTransportClientBuilder<'a> { let mut tls_connector = tls_connector.clone(); let client = loop { - let tcp_stream = connect(sockaddr, self.timeout, &host, tls_connector).await?; + let tcp_stream = connect(sockaddr, self.timeout, &host, &tls_connector).await?; let mut client = WsRawClient::new(BufReader::new(BufWriter::new(tcp_stream)), &host_header, &path); if let Some(origin) = self.origin_header.as_ref() { client.set_origin(origin); From e9d863f37528b2143eb57703b126a60498468092 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 30 Jun 2021 23:01:16 +0200 Subject: [PATCH 06/34] remove hacks --- ws-client/src/transport.rs | 74 +++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index ce3705bbde..22eadd5c9c 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -30,6 +30,7 @@ use futures::io::{BufReader, BufWriter}; use futures::prelude::*; use soketto::connection; use soketto::handshake::client::{Client as WsRawClient, ServerResponse}; +use std::path::{Path, PathBuf}; use std::{borrow::Cow, io, net::SocketAddr, sync::Arc, time::Duration}; use thiserror::Error; @@ -108,7 +109,7 @@ pub enum WsHandshakeError { #[error("Invalid DNS name: {}", 0)] InvalidDnsName(#[source] crate::tokio::InvalidDNSNameError), - /// RawServer rejected our handshake. + /// Server rejected the handshake. #[error("Connection rejected with status code: {}", status_code)] Rejected { /// HTTP status code that the server returned. @@ -175,34 +176,32 @@ impl<'a> WsTransportClientBuilder<'a> { Mode::Plain => None, }; - let mut err = None; - for sockaddr in &self.target.sockaddrs { - match self.try_connect(*sockaddr, &connector).await { - Ok(res) => return Ok(res), - Err(e) => { - log::debug!("Failed to connect to sockaddr: {:?} with err: {:?}", sockaddr, e); - err = Some(Err(e)); - } - } - } - // NOTE(niklasad1): this is most likely unreachable because [`Url::socket_addrs`] doesn't - // return an empty `Vec` if no socket address was found for the host name. - err.unwrap_or(Err(WsHandshakeError::NoAddressFound(self.target.host))) + self.try_connect(connector).await } async fn try_connect( - &self, - mut sockaddr: SocketAddr, - tls_connector: &Option, + self, + mut tls_connector: Option, ) -> Result<(Sender, Receiver), WsHandshakeError> { - let mut path = self.target.path.clone(); - let mut host = self.target.host.clone(); - let mut host_header = self.target.host_header.clone(); - let mut tls_connector = tls_connector.clone(); + let mut sockaddrs = self.target.sockaddrs; + let mut path = PathBuf::from(self.target.path); + let mut host = self.target.host; + let mut host_header = self.target.host_header; + + let mut err = Err(None); let client = loop { + let sockaddr = match sockaddrs.pop() { + Some(addr) => addr, + None => break err, + }; + let tcp_stream = connect(sockaddr, self.timeout, &host, &tls_connector).await?; - let mut client = WsRawClient::new(BufReader::new(BufWriter::new(tcp_stream)), &host_header, &path); + let mut client = WsRawClient::new( + BufReader::new(BufWriter::new(tcp_stream)), + &host_header, + path.to_str().expect("valid UTF-8 checked by Url::parse; qed"), + ); if let Some(origin) = self.origin_header.as_ref() { client.set_origin(origin); } @@ -210,19 +209,16 @@ impl<'a> WsTransportClientBuilder<'a> { match client.handshake().await? { ServerResponse::Accepted { .. } => break Ok(client), ServerResponse::Rejected { status_code } => { - break Err(WsHandshakeError::Rejected { status_code }); + err = Err(Some(WsHandshakeError::Rejected { status_code })); } ServerResponse::Redirect { status_code, location } => { log::trace!("recv redirection: status_code: {}, location: {}", status_code, location); - log::debug!("trying to reconnect to redirection: {}", location); match url::Url::parse(&location) { - // absolute URL => need to lookup sockaddr. - // TODO: this is hacky, we need to actually test all sockaddrs + // redirection with absolute path => need to lookup. Ok(url) => { let target = Target::parse(url)?; - // TODO. - sockaddr = target.sockaddrs[0]; - path = target.path; + sockaddrs = target.sockaddrs; + path = PathBuf::from(target.path); host = target.host; host_header = target.host_header; tls_connector = match target.mode { @@ -237,17 +233,23 @@ impl<'a> WsTransportClientBuilder<'a> { Mode::Plain => None, }; } - // URL is relative, just set it as location. - Err( - url::ParseError::RelativeUrlWithCannotBeABaseBase | url::ParseError::RelativeUrlWithoutBase, - ) => { - path = location; + // redirection is relative, either `/baz` or `bar`. + Err(_) => { + // replace the entire path if `location` is `/`. + if location.starts_with('/') { + path = location.into(); + } else { + // join paths such that the leaf is replaced with `location`. + let strip_last_child = + Path::new(&path).ancestors().nth(1).unwrap_or_else(|| Path::new("/")); + path = strip_last_child.join(location); + } } - Err(e) => return Err(WsHandshakeError::Url(format!("Invalid URL: {}", e).into())), }; } }; - }?; + } + .map_err(|e| e.unwrap_or(WsHandshakeError::NoAddressFound(host)))?; // If the handshake succeeded, return. let mut builder = client.into_builder(); From 5a1e118f5f55e7faa733451b9cc3a0061ebb0814 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 30 Aug 2021 17:49:42 +0200 Subject: [PATCH 07/34] fix bad merge --- ws-client/src/transport.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index bad7213c39..47e5efc910 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -24,6 +24,7 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::stream::EitherStream; use futures::io::{BufReader, BufWriter}; use futures::prelude::*; use soketto::connection; @@ -38,7 +39,7 @@ use tokio_rustls::{ TlsConnector, }; -type TlsOrPlain = crate::stream::EitherStream>; +type TlsOrPlain = EitherStream>; /// Sending end of WebSocket transport. #[derive(Debug)] @@ -185,10 +186,10 @@ impl<'a> WsTransportClientBuilder<'a> { async fn try_connect( self, - mut tls_connector: Option, + mut tls_connector: Option, ) -> Result<(Sender, Receiver), WsHandshakeError> { let mut sockaddrs = self.target.sockaddrs; - let mut path = PathBuf::from(self.target.path); + let mut path_and_query = PathBuf::from(self.target.path_and_query); let mut host = self.target.host; let mut host_header = self.target.host_header; @@ -204,7 +205,7 @@ impl<'a> WsTransportClientBuilder<'a> { let mut client = WsRawClient::new( BufReader::new(BufWriter::new(tcp_stream)), &host_header, - path.to_str().expect("valid UTF-8 checked by Url::parse; qed"), + path_and_query.to_str().expect("valid UTF-8 checked by Url::parse; qed"), ); if let Some(origin) = self.origin_header.as_ref() { client.set_origin(origin); @@ -222,7 +223,7 @@ impl<'a> WsTransportClientBuilder<'a> { Ok(url) => { let target = Target::parse(url)?; sockaddrs = target.sockaddrs; - path = PathBuf::from(target.path); + path_and_query = PathBuf::from(target.path_and_query); host = target.host; host_header = target.host_header; tls_connector = match target.mode { @@ -241,12 +242,12 @@ impl<'a> WsTransportClientBuilder<'a> { Err(_) => { // replace the entire path if `location` is `/`. if location.starts_with('/') { - path = location.into(); + path_and_query = location.into(); } else { // join paths such that the leaf is replaced with `location`. let strip_last_child = - Path::new(&path).ancestors().nth(1).unwrap_or_else(|| Path::new("/")); - path = strip_last_child.join(location); + Path::new(&path_and_query).ancestors().nth(1).unwrap_or_else(|| Path::new("/")); + path_and_query = strip_last_child.join(location); } } }; @@ -267,10 +268,10 @@ async fn connect( sockaddr: SocketAddr, timeout_dur: Duration, host: &str, - tls_connector: &Option, + tls_connector: &Option, ) -> Result>, WsHandshakeError> { let socket = TcpStream::connect(sockaddr); - let timeout = crate::tokio::sleep(timeout_dur); + let timeout = tokio::time::sleep(timeout_dur); futures::pin_mut!(socket, timeout); Ok(match future::select(socket, timeout).await { future::Either::Left((socket, _)) => { @@ -281,7 +282,7 @@ async fn connect( match tls_connector { None => TlsOrPlain::Plain(socket), Some(connector) => { - let dns_name = crate::tokio::DNSNameRef::try_from_ascii_str(host)?; + let dns_name = DNSNameRef::try_from_ascii_str(host)?; let tls_stream = connector.connect(dns_name, socket).await?; TlsOrPlain::Tls(tls_stream) } From 9e506c181533f66f540804432ab141cda0547939 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 30 Aug 2021 19:23:29 +0200 Subject: [PATCH 08/34] address grumbles --- ws-client/src/transport.rs | 52 +++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 47e5efc910..a503c55527 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -26,9 +26,8 @@ use crate::stream::EitherStream; use futures::io::{BufReader, BufWriter}; -use futures::prelude::*; use soketto::connection; -use soketto::handshake::client::{Client as WsRawClient, ServerResponse}; +use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse}; use std::path::{Path, PathBuf}; use std::{borrow::Cow, io, net::SocketAddr, sync::Arc, time::Duration}; use thiserror::Error; @@ -41,6 +40,9 @@ use tokio_rustls::{ type TlsOrPlain = EitherStream>; +// TODO(niklasad1): make this configurable. +const MAX_REDIRECTIONS_ALLOWED: usize = 5; + /// Sending end of WebSocket transport. #[derive(Debug)] pub struct Sender { @@ -193,16 +195,16 @@ impl<'a> WsTransportClientBuilder<'a> { let mut host = self.target.host; let mut host_header = self.target.host_header; - let mut err = Err(None); + let mut err = None; - let client = loop { + for _ in 0..MAX_REDIRECTIONS_ALLOWED { let sockaddr = match sockaddrs.pop() { Some(addr) => addr, - None => break err, + None => return err.unwrap_or(Err(WsHandshakeError::NoAddressFound(host))), }; let tcp_stream = connect(sockaddr, self.timeout, &host, &tls_connector).await?; - let mut client = WsRawClient::new( + let mut client = WsHandshakeClient::new( BufReader::new(BufWriter::new(tcp_stream)), &host_header, path_and_query.to_str().expect("valid UTF-8 checked by Url::parse; qed"), @@ -212,9 +214,15 @@ impl<'a> WsTransportClientBuilder<'a> { } // Perform the initial handshake. match client.handshake().await? { - ServerResponse::Accepted { .. } => break Ok(client), + ServerResponse::Accepted { .. } => { + let mut builder = client.into_builder(); + builder.set_max_message_size(self.max_request_body_size as usize); + let (sender, receiver) = builder.finish(); + return Ok((Sender { inner: sender }, Receiver { inner: receiver })); + } + ServerResponse::Rejected { status_code } => { - err = Err(Some(WsHandshakeError::Rejected { status_code })); + err = Some(Err(WsHandshakeError::Rejected { status_code })); } ServerResponse::Redirect { status_code, location } => { log::trace!("recv redirection: status_code: {}, location: {}", status_code, location); @@ -239,7 +247,9 @@ impl<'a> WsTransportClientBuilder<'a> { }; } // redirection is relative, either `/baz` or `bar`. - Err(_) => { + Err( + url::ParseError::RelativeUrlWithoutBase | url::ParseError::RelativeUrlWithCannotBeABaseBase, + ) => { // replace the entire path if `location` is `/`. if location.starts_with('/') { path_and_query = location.into(); @@ -250,17 +260,14 @@ impl<'a> WsTransportClientBuilder<'a> { path_and_query = strip_last_child.join(location); } } + Err(e) => { + err = Some(Err(WsHandshakeError::Url(e.to_string().into()))); + } }; } }; } - .map_err(|e| e.unwrap_or(WsHandshakeError::NoAddressFound(host)))?; - - // If the handshake succeeded, return. - let mut builder = client.into_builder(); - builder.set_max_message_size(self.max_request_body_size as usize); - let (sender, receiver) = builder.finish(); - Ok((Sender { inner: sender }, Receiver { inner: receiver })) + err.unwrap_or(Err(WsHandshakeError::NoAddressFound(host))) } } @@ -272,24 +279,23 @@ async fn connect( ) -> Result>, WsHandshakeError> { let socket = TcpStream::connect(sockaddr); let timeout = tokio::time::sleep(timeout_dur); - futures::pin_mut!(socket, timeout); - Ok(match future::select(socket, timeout).await { - future::Either::Left((socket, _)) => { + tokio::select! { + socket = socket => { let socket = socket?; if let Err(err) = socket.set_nodelay(true) { log::warn!("set nodelay failed: {:?}", err); } match tls_connector { - None => TlsOrPlain::Plain(socket), + None => Ok(TlsOrPlain::Plain(socket)), Some(connector) => { let dns_name = DNSNameRef::try_from_ascii_str(host)?; let tls_stream = connector.connect(dns_name, socket).await?; - TlsOrPlain::Tls(tls_stream) + Ok(TlsOrPlain::Tls(tls_stream)) } } } - future::Either::Right((_, _)) => return Err(WsHandshakeError::Timeout(timeout_dur)), - }) + _ = timeout => Err(WsHandshakeError::Timeout(timeout_dur)) + } } impl From for WsHandshakeError { From d27a7087ec674a7a3eed6963bd745d2a26bf35ef Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 29 Sep 2021 11:58:18 +0200 Subject: [PATCH 09/34] fix grumbles --- ws-client/src/transport.rs | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 8a9f3f9b71..445f326a15 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -28,7 +28,7 @@ use crate::stream::EitherStream; use futures::io::{BufReader, BufWriter}; use soketto::connection; use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse}; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::{borrow::Cow, io, net::SocketAddr, sync::Arc, time::Duration}; use thiserror::Error; use tokio::net::TcpStream; @@ -195,24 +195,21 @@ impl<'a> WsTransportClientBuilder<'a> { self, mut tls_connector: Option, ) -> Result<(Sender, Receiver), WsHandshakeError> { - let mut sockaddrs = self.target.sockaddrs; - let mut path_and_query = PathBuf::from(self.target.path_and_query); - let mut host = self.target.host; - let mut host_header = self.target.host_header; + let mut target = self.target; let mut err = None; for _ in 0..MAX_REDIRECTIONS_ALLOWED { - let sockaddr = match sockaddrs.pop() { + let sockaddr = match target.sockaddrs.pop() { Some(addr) => addr, - None => return err.unwrap_or(Err(WsHandshakeError::NoAddressFound(host))), + None => return err.unwrap_or(Err(WsHandshakeError::NoAddressFound(target.host))), }; - let tcp_stream = connect(sockaddr, self.timeout, &host, &tls_connector).await?; + let tcp_stream = connect(sockaddr, self.timeout, &target.host, &tls_connector).await?; let mut client = WsHandshakeClient::new( BufReader::new(BufWriter::new(tcp_stream)), - &host_header, - path_and_query.to_str().expect("valid UTF-8 checked by Url::parse; qed"), + &target.host_header, + &target.path_and_query, ); if let Some(origin) = self.origin_header.as_ref() { client.set_origin(origin); @@ -235,10 +232,6 @@ impl<'a> WsTransportClientBuilder<'a> { // redirection with absolute path => need to lookup. Ok(url) => { let target = Target::parse(url)?; - sockaddrs = target.sockaddrs; - path_and_query = PathBuf::from(target.path_and_query); - host = target.host; - host_header = target.host_header; tls_connector = match target.mode { Mode::Tls => { let mut client_config = rustls::ClientConfig::default(); @@ -257,12 +250,18 @@ impl<'a> WsTransportClientBuilder<'a> { ) => { // replace the entire path if `location` is `/`. if location.starts_with('/') { - path_and_query = location.into(); + target.path_and_query = location.into(); } else { // join paths such that the leaf is replaced with `location`. - let strip_last_child = - Path::new(&path_and_query).ancestors().nth(1).unwrap_or_else(|| Path::new("/")); - path_and_query = strip_last_child.join(location); + let strip_last_child = Path::new(&target.path_and_query) + .ancestors() + .nth(1) + .unwrap_or_else(|| Path::new("/")); + target.path_and_query = strip_last_child + .join(location) + .to_str() + .expect("valid UTF-8 checked by Url::parse; qed") + .to_string(); } } Err(e) => { @@ -272,7 +271,7 @@ impl<'a> WsTransportClientBuilder<'a> { } }; } - err.unwrap_or(Err(WsHandshakeError::NoAddressFound(host))) + err.unwrap_or(Err(WsHandshakeError::NoAddressFound(target.host))) } } From 1fce702c29bb4c69a8e37d3c81898eaa153992b7 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 29 Sep 2021 12:14:27 +0200 Subject: [PATCH 10/34] fix grumbles --- ws-client/src/transport.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 445f326a15..734ff2d4d4 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -205,7 +205,13 @@ impl<'a> WsTransportClientBuilder<'a> { None => return err.unwrap_or(Err(WsHandshakeError::NoAddressFound(target.host))), }; - let tcp_stream = connect(sockaddr, self.timeout, &target.host, &tls_connector).await?; + let tcp_stream = match connect(sockaddr, self.timeout, &target.host, &tls_connector).await { + Ok(stream) => stream, + Err(e) => { + err = Some(Err(e)); + continue; + } + }; let mut client = WsHandshakeClient::new( BufReader::new(BufWriter::new(tcp_stream)), &target.host_header, @@ -215,18 +221,18 @@ impl<'a> WsTransportClientBuilder<'a> { client.set_origin(origin); } // Perform the initial handshake. - match client.handshake().await? { - ServerResponse::Accepted { .. } => { + match client.handshake().await { + Ok(ServerResponse::Accepted { .. }) => { let mut builder = client.into_builder(); builder.set_max_message_size(self.max_request_body_size as usize); let (sender, receiver) = builder.finish(); return Ok((Sender { inner: sender }, Receiver { inner: receiver })); } - ServerResponse::Rejected { status_code } => { + Ok(ServerResponse::Rejected { status_code }) => { err = Some(Err(WsHandshakeError::Rejected { status_code })); } - ServerResponse::Redirect { status_code, location } => { + Ok(ServerResponse::Redirect { status_code, location }) => { log::trace!("recv redirection: status_code: {}, location: {}", status_code, location); match url::Url::parse(&location) { // redirection with absolute path => need to lookup. @@ -269,6 +275,9 @@ impl<'a> WsTransportClientBuilder<'a> { } }; } + Err(e) => { + err = Some(Err(e.into())); + } }; } err.unwrap_or(Err(WsHandshakeError::NoAddressFound(target.host))) From 250c896d2ee0b15afc590ec0e8d139f7156c9f2c Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 29 Sep 2021 12:29:00 +0200 Subject: [PATCH 11/34] fix nit --- ws-client/src/transport.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 734ff2d4d4..67eb9225ce 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -256,7 +256,7 @@ impl<'a> WsTransportClientBuilder<'a> { ) => { // replace the entire path if `location` is `/`. if location.starts_with('/') { - target.path_and_query = location.into(); + target.path_and_query = location; } else { // join paths such that the leaf is replaced with `location`. let strip_last_child = Path::new(&target.path_and_query) From d200744243518059ca525a406650b81b0a85a3a6 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 30 Sep 2021 16:33:01 +0200 Subject: [PATCH 12/34] add redirection test --- test-utils/Cargo.toml | 2 +- test-utils/src/types.rs | 56 +++++++++++++++++++++++++++++++++++++- ws-client/src/tests.rs | 15 ++++++++++ ws-client/src/transport.rs | 17 ++++++++++-- 4 files changed, 85 insertions(+), 5 deletions(-) diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 2dba6c16fa..e4bfac7f27 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -15,6 +15,6 @@ hyper = { version = "0.14.10", features = ["full"] } log = "0.4" serde = { version = "1", default-features = false, features = ["derive"] } serde_json = "1" -soketto = "0.6" +soketto = { version = "0.7", features = ["http"] } tokio = { version = "1", features = ["net", "rt-multi-thread", "macros", "time"] } tokio-util = { version = "0.6", features = ["compat"] } diff --git a/test-utils/src/types.rs b/test-utils/src/types.rs index 5d3da21d6f..de9da0c0a9 100644 --- a/test-utils/src/types.rs +++ b/test-utils/src/types.rs @@ -34,7 +34,7 @@ use futures_util::{ stream::{self, StreamExt}, }; use serde::{Deserialize, Serialize}; -use soketto::handshake::{self, server::Response, Error as SokettoError, Server}; +use soketto::handshake::{self, http::is_upgrade_request, server::Response, Error as SokettoError, Server}; use std::io; use std::net::SocketAddr; use std::time::Duration; @@ -314,3 +314,57 @@ async fn connection_task(socket: tokio::net::TcpStream, mode: ServerMode, mut ex } } } + +// Run a `WebSocket server` that performs some re-directions for testing. +pub fn ws_server_with_redirect(other_server: String) -> String { + let addr = ([127, 0, 0, 1], 0).into(); + + let service = hyper::service::make_service_fn(move |_| { + let other_server = other_server.clone(); + async move { + Ok::<_, hyper::Error>(hyper::service::service_fn(move |req| { + let other_server = other_server.clone(); + async move { handler(req, other_server).await } + })) + } + }); + let server = hyper::Server::bind(&addr).serve(service); + let addr = server.local_addr(); + + tokio::spawn(async move { server.await }); + format!("ws://{}", addr) +} + +/// Handle incoming HTTP Requests. +async fn handler( + req: hyper::Request, + other_server: String, +) -> Result, soketto::BoxedError> { + if is_upgrade_request(&req) { + match req.uri().to_string().as_str() { + "/myblock/two" => { + let response = hyper::Response::builder() + .status(301) + .header("Location", other_server) + .body(Body::empty()) + .unwrap(); + Ok(response) + } + "/myblock/one" => { + let response = + hyper::Response::builder().status(301).header("Location", "two").body(Body::empty()).unwrap(); + Ok(response) + } + _ => { + let response = hyper::Response::builder() + .status(301) + .header("Location", "/myblock/one") + .body(Body::empty()) + .unwrap(); + Ok(response) + } + } + } else { + panic!("expect upgrade to WS"); + } +} diff --git a/ws-client/src/tests.rs b/ws-client/src/tests.rs index 2b9f74cbee..7e88e47f12 100644 --- a/ws-client/src/tests.rs +++ b/ws-client/src/tests.rs @@ -263,3 +263,18 @@ fn assert_error_response(err: Error, exp: ErrorObject) { e => panic!("Expected error: \"{}\", got: {:?}", err, e), }; } + +#[tokio::test] +async fn redirections() { + let server = WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), String::new()) + .with_default_timeout() + .await + .unwrap(); + + let server_url = format!("ws://{}", server.local_addr()); + let redirect_url = jsonrpsee_test_utils::types::ws_server_with_redirect(server_url); + + // The client will first connect to a server that only performs re-directions and finally + // redirect to another server to complete the handshake. + assert!(matches!(WsClientBuilder::default().build(&redirect_url).with_default_timeout().await, Ok(Ok(_)))); +} diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 67eb9225ce..652f307657 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -196,18 +196,25 @@ impl<'a> WsTransportClientBuilder<'a> { mut tls_connector: Option, ) -> Result<(Sender, Receiver), WsHandshakeError> { let mut target = self.target; + let mut used_sockaddrs = Vec::new(); let mut err = None; for _ in 0..MAX_REDIRECTIONS_ALLOWED { + log::debug!("Connecting to target: {:?}", target); + let sockaddr = match target.sockaddrs.pop() { - Some(addr) => addr, + Some(addr) => { + used_sockaddrs.push(addr); + addr + } None => return err.unwrap_or(Err(WsHandshakeError::NoAddressFound(target.host))), }; let tcp_stream = match connect(sockaddr, self.timeout, &target.host, &tls_connector).await { Ok(stream) => stream, Err(e) => { + log::error!("Failed to connect to sockaddr: {:?}", sockaddr); err = Some(Err(e)); continue; } @@ -223,6 +230,7 @@ impl<'a> WsTransportClientBuilder<'a> { // Perform the initial handshake. match client.handshake().await { Ok(ServerResponse::Accepted { .. }) => { + log::info!("Connection established to target: {:?}", target); let mut builder = client.into_builder(); builder.set_max_message_size(self.max_request_body_size as usize); let (sender, receiver) = builder.finish(); @@ -230,14 +238,16 @@ impl<'a> WsTransportClientBuilder<'a> { } Ok(ServerResponse::Rejected { status_code }) => { + log::debug!("Connection rejected: {:?}", status_code); err = Some(Err(WsHandshakeError::Rejected { status_code })); } Ok(ServerResponse::Redirect { status_code, location }) => { - log::trace!("recv redirection: status_code: {}, location: {}", status_code, location); + log::trace!("redirection: status_code: {}, location: {}", status_code, location); match url::Url::parse(&location) { // redirection with absolute path => need to lookup. Ok(url) => { - let target = Target::parse(url)?; + target = Target::parse(url)?; + used_sockaddrs.clear(); tls_connector = match target.mode { Mode::Tls => { let mut client_config = rustls::ClientConfig::default(); @@ -269,6 +279,7 @@ impl<'a> WsTransportClientBuilder<'a> { .expect("valid UTF-8 checked by Url::parse; qed") .to_string(); } + std::mem::swap(&mut target.sockaddrs, &mut used_sockaddrs); } Err(e) => { err = Some(Err(WsHandshakeError::Url(e.to_string().into()))); From 33f364cd62c2c28cdd608f3c0ce134af93a418af Mon Sep 17 00:00:00 2001 From: David Date: Fri, 1 Oct 2021 13:13:27 +0200 Subject: [PATCH 13/34] Update test-utils/src/types.rs --- test-utils/src/types.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test-utils/src/types.rs b/test-utils/src/types.rs index de9da0c0a9..04aff7c384 100644 --- a/test-utils/src/types.rs +++ b/test-utils/src/types.rs @@ -315,7 +315,8 @@ async fn connection_task(socket: tokio::net::TcpStream, mode: ServerMode, mut ex } } -// Run a `WebSocket server` that performs some re-directions for testing. +// Run a WebSocket server running on localhost that redirects requests for testing. +// Requests to any url except for `/myblock/two` will redirect one or two times (HTTP 301) and eventually end up in `/myblock/two`. pub fn ws_server_with_redirect(other_server: String) -> String { let addr = ([127, 0, 0, 1], 0).into(); From 9d9aabac1bcbc78aae1145e07f6f5301216ac148 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 1 Oct 2021 13:13:46 +0200 Subject: [PATCH 14/34] Resolved todo --- ws-client/src/transport.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 652f307657..8440746f1c 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -40,7 +40,6 @@ use tokio_rustls::{ type TlsOrPlain = EitherStream>; -// TODO(niklasad1): make this configurable. const MAX_REDIRECTIONS_ALLOWED: usize = 5; /// Sending end of WebSocket transport. From 720f709296a7e251948f1e2881c620bd7fce6f37 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 1 Oct 2021 13:48:07 +0200 Subject: [PATCH 15/34] Check that redirected client actually works --- ws-client/Cargo.toml | 1 + ws-client/src/tests.rs | 22 +++++++++++++++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/ws-client/Cargo.toml b/ws-client/Cargo.toml index aee5e93c56..866e414e2a 100644 --- a/ws-client/Cargo.toml +++ b/ws-client/Cargo.toml @@ -30,3 +30,4 @@ rustls-native-certs = "0.5.0" [dev-dependencies] jsonrpsee-test-utils = { path = "../test-utils" } +env_logger = "0.9" diff --git a/ws-client/src/tests.rs b/ws-client/src/tests.rs index 7e88e47f12..583303c027 100644 --- a/ws-client/src/tests.rs +++ b/ws-client/src/tests.rs @@ -266,15 +266,27 @@ fn assert_error_response(err: Error, exp: ErrorObject) { #[tokio::test] async fn redirections() { - let server = WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), String::new()) - .with_default_timeout() - .await - .unwrap(); + let expected = "abc 123"; + let server = WebSocketTestServer::with_hardcoded_response( + "127.0.0.1:0".parse().unwrap(), + ok_response(expected.into(), Id::Num(0)), + ) + .with_default_timeout() + .await + .unwrap(); let server_url = format!("ws://{}", server.local_addr()); let redirect_url = jsonrpsee_test_utils::types::ws_server_with_redirect(server_url); // The client will first connect to a server that only performs re-directions and finally // redirect to another server to complete the handshake. - assert!(matches!(WsClientBuilder::default().build(&redirect_url).with_default_timeout().await, Ok(Ok(_)))); + let client = WsClientBuilder::default().build(&redirect_url).with_default_timeout().await; + // It's a client + assert!(matches!(client, Ok(Ok(_)))); + let client = client.expect("tokio timers work").expect("WsClient builder works"); + // It's connected + assert!(client.is_connected()); + // It works + let response = client.request::("anything", ParamsSer::NoParams).with_default_timeout().await.unwrap(); + assert_eq!(response.unwrap(), String::from(expected)); } From 21583d115a3f3197d3f5357e734fea1bbccbfb4a Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 1 Oct 2021 13:58:52 +0200 Subject: [PATCH 16/34] Rename test-utils "types" to "mocks" --- http-client/src/tests.rs | 2 +- http-server/src/tests.rs | 2 +- test-utils/src/helpers.rs | 2 +- test-utils/src/lib.rs | 2 +- test-utils/src/{types.rs => mocks.rs} | 0 ws-client/src/tests.rs | 4 ++-- ws-server/src/tests.rs | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) rename test-utils/src/{types.rs => mocks.rs} (100%) diff --git a/http-client/src/tests.rs b/http-client/src/tests.rs index 8ce53edb98..a7ba27cc8a 100644 --- a/http-client/src/tests.rs +++ b/http-client/src/tests.rs @@ -31,7 +31,7 @@ use crate::types::{ }; use crate::HttpClientBuilder; use jsonrpsee_test_utils::helpers::*; -use jsonrpsee_test_utils::types::Id; +use jsonrpsee_test_utils::mocks::Id; use jsonrpsee_test_utils::TimeoutFutureExt; #[tokio::test] diff --git a/http-server/src/tests.rs b/http-server/src/tests.rs index f122128e76..55dcc9834c 100644 --- a/http-server/src/tests.rs +++ b/http-server/src/tests.rs @@ -32,7 +32,7 @@ use crate::types::error::{CallError, Error}; use crate::{server::StopHandle, HttpServerBuilder, RpcModule}; use jsonrpsee_test_utils::helpers::*; -use jsonrpsee_test_utils::types::{Id, StatusCode, TestContext}; +use jsonrpsee_test_utils::mocks::{Id, StatusCode, TestContext}; use jsonrpsee_test_utils::TimeoutFutureExt; use serde_json::Value as JsonValue; use tokio::task::JoinHandle; diff --git a/test-utils/src/helpers.rs b/test-utils/src/helpers.rs index 06cbe068b3..7cb3e7521f 100644 --- a/test-utils/src/helpers.rs +++ b/test-utils/src/helpers.rs @@ -24,7 +24,7 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::types::{Body, HttpResponse, Id, Uri}; +use crate::mocks::{Body, HttpResponse, Id, Uri}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Request, Response, Server}; use serde_json::Value; diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 6f6133eeaa..c47211bc62 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -32,7 +32,7 @@ use std::{future::Future, time::Duration}; use tokio::time::{timeout, Timeout}; pub mod helpers; -pub mod types; +pub mod mocks; /// Helper extension trait which allows to limit execution time for the futures. /// It is helpful in tests to ensure that no future will ever get stuck forever. diff --git a/test-utils/src/types.rs b/test-utils/src/mocks.rs similarity index 100% rename from test-utils/src/types.rs rename to test-utils/src/mocks.rs diff --git a/ws-client/src/tests.rs b/ws-client/src/tests.rs index 583303c027..26420b9ff5 100644 --- a/ws-client/src/tests.rs +++ b/ws-client/src/tests.rs @@ -32,7 +32,7 @@ use crate::types::{ }; use crate::WsClientBuilder; use jsonrpsee_test_utils::helpers::*; -use jsonrpsee_test_utils::types::{Id, WebSocketTestServer}; +use jsonrpsee_test_utils::mocks::{Id, WebSocketTestServer}; use jsonrpsee_test_utils::TimeoutFutureExt; use serde_json::Value as JsonValue; @@ -276,7 +276,7 @@ async fn redirections() { .unwrap(); let server_url = format!("ws://{}", server.local_addr()); - let redirect_url = jsonrpsee_test_utils::types::ws_server_with_redirect(server_url); + let redirect_url = jsonrpsee_test_utils::mocks::ws_server_with_redirect(server_url); // The client will first connect to a server that only performs re-directions and finally // redirect to another server to complete the handshake. diff --git a/ws-server/src/tests.rs b/ws-server/src/tests.rs index d1b9cc7dbd..5739cd37ba 100644 --- a/ws-server/src/tests.rs +++ b/ws-server/src/tests.rs @@ -30,7 +30,7 @@ use crate::types::error::{CallError, Error}; use crate::{future::StopHandle, RpcModule, WsServerBuilder}; use anyhow::anyhow; use jsonrpsee_test_utils::helpers::*; -use jsonrpsee_test_utils::types::{Id, TestContext, WebSocketTestClient, WebSocketTestError}; +use jsonrpsee_test_utils::mocks::{Id, TestContext, WebSocketTestClient, WebSocketTestError}; use jsonrpsee_test_utils::TimeoutFutureExt; use serde_json::Value as JsonValue; use std::fmt; From f25a22d6d72facad86a249e9be830b8c9a695681 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 1 Oct 2021 14:15:29 +0200 Subject: [PATCH 17/34] Fix windows test (?) --- ws-client/src/tests.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ws-client/src/tests.rs b/ws-client/src/tests.rs index 26420b9ff5..787dcd2a5c 100644 --- a/ws-client/src/tests.rs +++ b/ws-client/src/tests.rs @@ -281,9 +281,11 @@ async fn redirections() { // The client will first connect to a server that only performs re-directions and finally // redirect to another server to complete the handshake. let client = WsClientBuilder::default().build(&redirect_url).with_default_timeout().await; - // It's a client - assert!(matches!(client, Ok(Ok(_)))); - let client = client.expect("tokio timers work").expect("WsClient builder works"); + // It's an ok client + let client = match client { + Ok(Ok(client)) => client, + _ => panic!("WsClient builder failed") + }; // It's connected assert!(client.is_connected()); // It works From 0d54d2e55063eed9d33622cb43bb7c097dab2584 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 1 Oct 2021 14:31:13 +0200 Subject: [PATCH 18/34] fmt --- ws-client/src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ws-client/src/tests.rs b/ws-client/src/tests.rs index 787dcd2a5c..9388456b30 100644 --- a/ws-client/src/tests.rs +++ b/ws-client/src/tests.rs @@ -284,7 +284,7 @@ async fn redirections() { // It's an ok client let client = match client { Ok(Ok(client)) => client, - _ => panic!("WsClient builder failed") + _ => panic!("WsClient builder failed"), }; // It's connected assert!(client.is_connected()); From b93287c529cec6a5b4b88fc666a9a39e9c634428 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 1 Oct 2021 15:26:04 +0200 Subject: [PATCH 19/34] What is wrong with you windows? --- ws-client/src/tests.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ws-client/src/tests.rs b/ws-client/src/tests.rs index 9388456b30..084b2aa428 100644 --- a/ws-client/src/tests.rs +++ b/ws-client/src/tests.rs @@ -284,7 +284,8 @@ async fn redirections() { // It's an ok client let client = match client { Ok(Ok(client)) => client, - _ => panic!("WsClient builder failed"), + Ok(Err(e)) => panic!("WsClient builder failed with: {:?}", e), + Err(e) => panic!("WsClient builder timed out with: {:?}", e), }; // It's connected assert!(client.is_connected()); From 4df383ffd4a7d148ba8b9f0206b546351350b0b9 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 1 Oct 2021 15:50:05 +0200 Subject: [PATCH 20/34] Ignore redirect test on windows --- ws-client/src/tests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ws-client/src/tests.rs b/ws-client/src/tests.rs index 084b2aa428..caabbe43d3 100644 --- a/ws-client/src/tests.rs +++ b/ws-client/src/tests.rs @@ -264,6 +264,7 @@ fn assert_error_response(err: Error, exp: ErrorObject) { }; } +#[cfg_attr(target_os = "windows", ignore)] #[tokio::test] async fn redirections() { let expected = "abc 123"; From ae2791c4d8341608467a69f1ebc0715e0ecde04d Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 2 Oct 2021 09:54:40 +0200 Subject: [PATCH 21/34] fix bad transport errors --- ws-client/src/tests.rs | 3 ++- ws-client/src/transport.rs | 23 ++++++++++++----------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/ws-client/src/tests.rs b/ws-client/src/tests.rs index caabbe43d3..6394580505 100644 --- a/ws-client/src/tests.rs +++ b/ws-client/src/tests.rs @@ -264,9 +264,10 @@ fn assert_error_response(err: Error, exp: ErrorObject) { }; } -#[cfg_attr(target_os = "windows", ignore)] +//#[cfg_attr(target_os = "windows", ignore)] #[tokio::test] async fn redirections() { + env_logger::try_init(); let expected = "abc 123"; let server = WebSocketTestServer::with_hardcoded_response( "127.0.0.1:0".parse().unwrap(), diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 8440746f1c..9e3ff8a0e9 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -96,42 +96,42 @@ pub enum CertificateStore { #[derive(Debug, Error)] pub enum WsHandshakeError { /// Failed to load system certs - #[error("Failed to load system certs: {}", 0)] + #[error("Failed to load system certs: {0}")] CertificateStore(io::Error), /// Invalid URL. - #[error("Invalid url: {}", 0)] + #[error("Invalid URL: {0}")] Url(Cow<'static, str>), /// Error when opening the TCP socket. - #[error("Error when opening the TCP socket: {}", 0)] + #[error("Error when opening the TCP socket: {0}")] Io(io::Error), /// Error in the transport layer. - #[error("Error in the WebSocket handshake: {}", 0)] + #[error("Error in the WebSocket handshake: {0}")] Transport(#[source] soketto::handshake::Error), /// Invalid DNS name error for TLS - #[error("Invalid DNS name: {}", 0)] + #[error("Invalid DNS name: {0}")] InvalidDnsName(#[source] InvalidDNSNameError), /// Server rejected the handshake. - #[error("Connection rejected with status code: {}", status_code)] + #[error("Connection rejected with status code: {status_code}")] Rejected { /// HTTP status code that the server returned. status_code: u16, }, /// Timeout while trying to connect. - #[error("Connection timeout exceeded: {}", 0)] + #[error("Connection timeout exceeded: {0:?}")] Timeout(Duration), /// Failed to resolve IP addresses for this hostname. - #[error("Failed to resolve IP addresses for this hostname: {}", 0)] + #[error("Failed to resolve IP addresses for this hostname: {0}")] ResolutionFailed(io::Error), /// Couldn't find any IP address for this hostname. - #[error("No IP address found for this hostname: {}", 0)] + #[error("No IP address found for this hostname: {0}")] NoAddressFound(String), } @@ -200,7 +200,8 @@ impl<'a> WsTransportClientBuilder<'a> { let mut err = None; for _ in 0..MAX_REDIRECTIONS_ALLOWED { - log::debug!("Connecting to target: {:?}", target); + // TODO(niklasad1): this should be debug. + log::info!("Connecting to target: {:?}", target); let sockaddr = match target.sockaddrs.pop() { Some(addr) => { @@ -241,7 +242,7 @@ impl<'a> WsTransportClientBuilder<'a> { err = Some(Err(WsHandshakeError::Rejected { status_code })); } Ok(ServerResponse::Redirect { status_code, location }) => { - log::trace!("redirection: status_code: {}, location: {}", status_code, location); + log::trace!("Redirection: status_code: {}, location: {}", status_code, location); match url::Url::parse(&location) { // redirection with absolute path => need to lookup. Ok(url) => { From 0d4b18bf5f4a3b4d9ae49b163d4605756e3a741f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 2 Oct 2021 11:37:06 +0200 Subject: [PATCH 22/34] debug windows tests --- ws-client/src/transport.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 9e3ff8a0e9..7363dc606d 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -201,7 +201,7 @@ impl<'a> WsTransportClientBuilder<'a> { for _ in 0..MAX_REDIRECTIONS_ALLOWED { // TODO(niklasad1): this should be debug. - log::info!("Connecting to target: {:?}", target); + log::error!("Connecting to target: {:?}", target); let sockaddr = match target.sockaddrs.pop() { Some(addr) => { @@ -214,6 +214,7 @@ impl<'a> WsTransportClientBuilder<'a> { let tcp_stream = match connect(sockaddr, self.timeout, &target.host, &tls_connector).await { Ok(stream) => stream, Err(e) => { + // TODO(niklasad1): this should be debug. log::error!("Failed to connect to sockaddr: {:?}", sockaddr); err = Some(Err(e)); continue; @@ -230,7 +231,8 @@ impl<'a> WsTransportClientBuilder<'a> { // Perform the initial handshake. match client.handshake().await { Ok(ServerResponse::Accepted { .. }) => { - log::info!("Connection established to target: {:?}", target); + // TODO(niklasad1): this should be debug. + log::error!("Connection established to target: {:?}", target); let mut builder = client.into_builder(); builder.set_max_message_size(self.max_request_body_size as usize); let (sender, receiver) = builder.finish(); @@ -238,7 +240,8 @@ impl<'a> WsTransportClientBuilder<'a> { } Ok(ServerResponse::Rejected { status_code }) => { - log::debug!("Connection rejected: {:?}", status_code); + // TODO(niklasad1): this should be debug. + log::error!("Connection rejected: {:?}", status_code); err = Some(Err(WsHandshakeError::Rejected { status_code })); } Ok(ServerResponse::Redirect { status_code, location }) => { From adbd2c6df6aca78484f7a873ad4ad0dd02d453e1 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 2 Oct 2021 12:22:13 +0200 Subject: [PATCH 23/34] update soketto --- types/Cargo.toml | 2 +- ws-client/Cargo.toml | 2 +- ws-client/src/client.rs | 10 ++++++---- ws-client/src/transport.rs | 10 +++++----- ws-server/Cargo.toml | 2 +- 5 files changed, 14 insertions(+), 12 deletions(-) diff --git a/types/Cargo.toml b/types/Cargo.toml index cd18e52861..a170456b0a 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -19,5 +19,5 @@ log = { version = "0.4", default-features = false } serde = { version = "1", default-features = false, features = ["derive"] } serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value", "std"] } thiserror = "1.0" -soketto = "0.6" +soketto = "0.7" hyper = "0.14.10" diff --git a/ws-client/Cargo.toml b/ws-client/Cargo.toml index 866e414e2a..833de6231b 100644 --- a/ws-client/Cargo.toml +++ b/ws-client/Cargo.toml @@ -21,7 +21,7 @@ jsonrpsee-types = { path = "../types", version = "0.3.0" } log = "0.4" serde = "1" serde_json = "1" -soketto = "0.6" +soketto = "0.7" pin-project = "1" thiserror = "1" url = "2" diff --git a/ws-client/src/client.rs b/ws-client/src/client.rs index 048c99a83d..7f7ed1b773 100644 --- a/ws-client/src/client.rs +++ b/ws-client/src/client.rs @@ -49,7 +49,9 @@ use futures::{ use tokio::sync::Mutex; use serde::de::DeserializeOwned; -use std::{borrow::Cow, time::Duration}; +use std::time::Duration; + +pub use soketto::handshake::client::Header; /// Wrapper over a [`oneshot::Receiver`](futures::channel::oneshot::Receiver) that reads /// the underlying channel once and then stores the result in String. @@ -106,7 +108,7 @@ pub struct WsClientBuilder<'a> { max_request_body_size: u32, request_timeout: Duration, connection_timeout: Duration, - origin_header: Option>, + origin_header: Option>, max_concurrent_requests: usize, max_notifs_per_subscription: usize, } @@ -151,8 +153,8 @@ impl<'a> WsClientBuilder<'a> { } /// Set origin header to pass during the handshake. - pub fn origin_header(mut self, origin: &'a str) -> Self { - self.origin_header = Some(Cow::Borrowed(origin)); + pub fn origin_header(mut self, origin: Header<'a>) -> Self { + self.origin_header = Some(origin); self } diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 7363dc606d..c9778f75ea 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -24,7 +24,7 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::stream::EitherStream; +use crate::{client::Header, stream::EitherStream}; use futures::io::{BufReader, BufWriter}; use soketto::connection; use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse}; @@ -65,7 +65,7 @@ pub struct WsTransportClientBuilder<'a> { pub timeout: Duration, /// `Origin` header to pass during the HTTP handshake. If `None`, no /// `Origin` header is passed. - pub origin_header: Option>, + pub origin_header: Option>, /// Max payload size pub max_request_body_size: u32, } @@ -196,7 +196,7 @@ impl<'a> WsTransportClientBuilder<'a> { ) -> Result<(Sender, Receiver), WsHandshakeError> { let mut target = self.target; let mut used_sockaddrs = Vec::new(); - + let origin = self.origin_header.map(|o| [o]); let mut err = None; for _ in 0..MAX_REDIRECTIONS_ALLOWED { @@ -225,8 +225,8 @@ impl<'a> WsTransportClientBuilder<'a> { &target.host_header, &target.path_and_query, ); - if let Some(origin) = self.origin_header.as_ref() { - client.set_origin(origin); + if let Some(origin) = &origin { + client.set_headers(origin); } // Perform the initial handshake. match client.handshake().await { diff --git a/ws-server/Cargo.toml b/ws-server/Cargo.toml index 489e5f2017..6e9612b8e6 100644 --- a/ws-server/Cargo.toml +++ b/ws-server/Cargo.toml @@ -16,7 +16,7 @@ jsonrpsee-types = { path = "../types", version = "0.3.0" } jsonrpsee-utils = { path = "../utils", version = "0.3.0", features = ["server"] } log = "0.4" serde_json = { version = "1", features = ["raw_value"] } -soketto = "0.6" +soketto = "0.7" tokio = { version = "1", features = ["net", "rt-multi-thread", "macros"] } tokio-util = { version = "0.6", features = ["compat"] } From bf0d71d9a057579de9c88de06a881cd2e3ed7e88 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 2 Oct 2021 13:51:07 +0200 Subject: [PATCH 24/34] maybe fix windows test --- test-utils/Cargo.toml | 3 ++- test-utils/src/mocks.rs | 45 ++++++++++++++++++----------------------- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index e4bfac7f27..6a8885c1ad 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -16,5 +16,6 @@ log = "0.4" serde = { version = "1", default-features = false, features = ["derive"] } serde_json = "1" soketto = { version = "0.7", features = ["http"] } +path-slash = "0.1.4" tokio = { version = "1", features = ["net", "rt-multi-thread", "macros", "time"] } -tokio-util = { version = "0.6", features = ["compat"] } +tokio-util = { version = "0.6", features = ["compat"] } \ No newline at end of file diff --git a/test-utils/src/mocks.rs b/test-utils/src/mocks.rs index 04aff7c384..95221749ee 100644 --- a/test-utils/src/mocks.rs +++ b/test-utils/src/mocks.rs @@ -33,11 +33,10 @@ use futures_util::{ sink::SinkExt, stream::{self, StreamExt}, }; +use path_slash::PathBufExt; use serde::{Deserialize, Serialize}; use soketto::handshake::{self, http::is_upgrade_request, server::Response, Error as SokettoError, Server}; -use std::io; -use std::net::SocketAddr; -use std::time::Duration; +use std::{io, net::SocketAddr, path::PathBuf, time::Duration}; use tokio::net::TcpStream; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; @@ -342,28 +341,24 @@ async fn handler( other_server: String, ) -> Result, soketto::BoxedError> { if is_upgrade_request(&req) { - match req.uri().to_string().as_str() { - "/myblock/two" => { - let response = hyper::Response::builder() - .status(301) - .header("Location", other_server) - .body(Body::empty()) - .unwrap(); - Ok(response) - } - "/myblock/one" => { - let response = - hyper::Response::builder().status(301).header("Location", "two").body(Body::empty()).unwrap(); - Ok(response) - } - _ => { - let response = hyper::Response::builder() - .status(301) - .header("Location", "/myblock/one") - .body(Body::empty()) - .unwrap(); - Ok(response) - } + let path = PathBuf::from_slash(req.uri().path()); + + if path == PathBuf::from_slash("/myblock/two") { + let response = + hyper::Response::builder().status(301).header("Location", other_server).body(Body::empty()).unwrap(); + Ok(response) + } else if path == PathBuf::from_slash("/myblock/one") { + let response = + hyper::Response::builder().status(301).header("Location", "two").body(Body::empty()).unwrap(); + Ok(response) + } else { + let path = PathBuf::from_slash("/myblock/one"); + let response = hyper::Response::builder() + .status(301) + .header("Location", path.to_str().expect("valid utf8 must be used")) + .body(Body::empty()) + .unwrap(); + Ok(response) } } else { panic!("expect upgrade to WS"); From 186bf843cfa4611c276e91bf3ec994a7b4ce73b0 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 2 Oct 2021 14:36:40 +0200 Subject: [PATCH 25/34] add config flag for max redirections --- test-utils/src/mocks.rs | 1 + ws-client/src/client.rs | 9 ++ ws-client/src/transport.rs | 174 ++++++++++++++++++------------------- 3 files changed, 94 insertions(+), 90 deletions(-) diff --git a/test-utils/src/mocks.rs b/test-utils/src/mocks.rs index 95221749ee..f7c8f8dfab 100644 --- a/test-utils/src/mocks.rs +++ b/test-utils/src/mocks.rs @@ -341,6 +341,7 @@ async fn handler( other_server: String, ) -> Result, soketto::BoxedError> { if is_upgrade_request(&req) { + log::error!("{:?}", req.uri().path()); let path = PathBuf::from_slash(req.uri().path()); if path == PathBuf::from_slash("/myblock/two") { diff --git a/ws-client/src/client.rs b/ws-client/src/client.rs index 7f7ed1b773..66276c08d9 100644 --- a/ws-client/src/client.rs +++ b/ws-client/src/client.rs @@ -111,6 +111,7 @@ pub struct WsClientBuilder<'a> { origin_header: Option>, max_concurrent_requests: usize, max_notifs_per_subscription: usize, + max_redirections: usize, } impl<'a> Default for WsClientBuilder<'a> { @@ -123,6 +124,7 @@ impl<'a> Default for WsClientBuilder<'a> { origin_header: None, max_concurrent_requests: 256, max_notifs_per_subscription: 1024, + max_redirections: 10, } } } @@ -178,6 +180,12 @@ impl<'a> WsClientBuilder<'a> { self } + /// Set the max number of redirections to perform until a connection is regarded as failed. + pub fn max_redirections(mut self, redirect: usize) -> Self { + self.max_redirections = redirect; + self + } + /// Build the client with specified URL to connect to. /// If the port number is missing from the URL, the default port number is used. /// @@ -203,6 +211,7 @@ impl<'a> WsClientBuilder<'a> { timeout: self.connection_timeout, origin_header: self.origin_header, max_request_body_size: self.max_request_body_size, + max_redirections: self.max_redirections, }; let (sender, receiver) = builder.build().await.map_err(|e| Error::Transport(e.into()))?; diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index c9778f75ea..1ab1244f0c 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -40,8 +40,6 @@ use tokio_rustls::{ type TlsOrPlain = EitherStream>; -const MAX_REDIRECTIONS_ALLOWED: usize = 5; - /// Sending end of WebSocket transport. #[derive(Debug)] pub struct Sender { @@ -68,6 +66,8 @@ pub struct WsTransportClientBuilder<'a> { pub origin_header: Option>, /// Max payload size pub max_request_body_size: u32, + /// Max number of redirections. + pub max_redirections: usize, } /// Stream mode, either plain TCP or TLS. @@ -195,104 +195,98 @@ impl<'a> WsTransportClientBuilder<'a> { mut tls_connector: Option, ) -> Result<(Sender, Receiver), WsHandshakeError> { let mut target = self.target; - let mut used_sockaddrs = Vec::new(); let origin = self.origin_header.map(|o| [o]); let mut err = None; - for _ in 0..MAX_REDIRECTIONS_ALLOWED { + for _ in 0..self.max_redirections { // TODO(niklasad1): this should be debug. log::error!("Connecting to target: {:?}", target); - let sockaddr = match target.sockaddrs.pop() { - Some(addr) => { - used_sockaddrs.push(addr); - addr - } - None => return err.unwrap_or(Err(WsHandshakeError::NoAddressFound(target.host))), - }; - - let tcp_stream = match connect(sockaddr, self.timeout, &target.host, &tls_connector).await { - Ok(stream) => stream, - Err(e) => { - // TODO(niklasad1): this should be debug. - log::error!("Failed to connect to sockaddr: {:?}", sockaddr); - err = Some(Err(e)); - continue; - } - }; - let mut client = WsHandshakeClient::new( - BufReader::new(BufWriter::new(tcp_stream)), - &target.host_header, - &target.path_and_query, - ); - if let Some(origin) = &origin { - client.set_headers(origin); - } - // Perform the initial handshake. - match client.handshake().await { - Ok(ServerResponse::Accepted { .. }) => { - // TODO(niklasad1): this should be debug. - log::error!("Connection established to target: {:?}", target); - let mut builder = client.into_builder(); - builder.set_max_message_size(self.max_request_body_size as usize); - let (sender, receiver) = builder.finish(); - return Ok((Sender { inner: sender }, Receiver { inner: receiver })); - } - - Ok(ServerResponse::Rejected { status_code }) => { - // TODO(niklasad1): this should be debug. - log::error!("Connection rejected: {:?}", status_code); - err = Some(Err(WsHandshakeError::Rejected { status_code })); + for sockaddr in target.sockaddrs.clone() { + let tcp_stream = match connect(sockaddr, self.timeout, &target.host, &tls_connector).await { + Ok(stream) => stream, + Err(e) => { + // TODO(niklasad1): this should be debug. + log::error!("Failed to connect to sockaddr: {:?}", sockaddr); + err = Some(Err(e)); + continue; + } + }; + let mut client = WsHandshakeClient::new( + BufReader::new(BufWriter::new(tcp_stream)), + &target.host_header, + &target.path_and_query, + ); + if let Some(origin) = &origin { + client.set_headers(origin); } - Ok(ServerResponse::Redirect { status_code, location }) => { - log::trace!("Redirection: status_code: {}, location: {}", status_code, location); - match url::Url::parse(&location) { - // redirection with absolute path => need to lookup. - Ok(url) => { - target = Target::parse(url)?; - used_sockaddrs.clear(); - tls_connector = match target.mode { - Mode::Tls => { - let mut client_config = rustls::ClientConfig::default(); - if let CertificateStore::Native = self.certificate_store { - client_config.root_store = rustls_native_certs::load_native_certs() - .map_err(|(_, e)| WsHandshakeError::CertificateStore(e))?; + // Perform the initial handshake. + match client.handshake().await { + Ok(ServerResponse::Accepted { .. }) => { + // TODO(niklasad1): this should be debug. + log::error!("Connection established to target: {:?}", target); + let mut builder = client.into_builder(); + builder.set_max_message_size(self.max_request_body_size as usize); + let (sender, receiver) = builder.finish(); + return Ok((Sender { inner: sender }, Receiver { inner: receiver })); + } + + Ok(ServerResponse::Rejected { status_code }) => { + // TODO(niklasad1): this should be debug. + log::error!("Connection rejected: {:?}", status_code); + err = Some(Err(WsHandshakeError::Rejected { status_code })); + } + Ok(ServerResponse::Redirect { status_code, location }) => { + log::error!("Redirection: status_code: {}, location: {}", status_code, location); + match url::Url::parse(&location) { + // redirection with absolute path => need to lookup. + Ok(url) => { + target = Target::parse(url)?; + tls_connector = match target.mode { + Mode::Tls => { + let mut client_config = rustls::ClientConfig::default(); + if let CertificateStore::Native = self.certificate_store { + client_config.root_store = rustls_native_certs::load_native_certs() + .map_err(|(_, e)| WsHandshakeError::CertificateStore(e))?; + } + Some(Arc::new(client_config).into()) } - Some(Arc::new(client_config).into()) + Mode::Plain => None, + }; + break; + } + // redirection is relative, either `/baz` or `bar`. + Err( + url::ParseError::RelativeUrlWithoutBase + | url::ParseError::RelativeUrlWithCannotBeABaseBase, + ) => { + // replace the entire path if `location` is `/`. + if location.starts_with('/') { + target.path_and_query = location; + } else { + // join paths such that the leaf is replaced with `location`. + let strip_last_child = Path::new(&target.path_and_query) + .ancestors() + .nth(1) + .unwrap_or_else(|| Path::new("/")); + target.path_and_query = strip_last_child + .join(location) + .to_str() + .expect("valid UTF-8 checked by Url::parse; qed") + .to_string(); } - Mode::Plain => None, - }; - } - // redirection is relative, either `/baz` or `bar`. - Err( - url::ParseError::RelativeUrlWithoutBase | url::ParseError::RelativeUrlWithCannotBeABaseBase, - ) => { - // replace the entire path if `location` is `/`. - if location.starts_with('/') { - target.path_and_query = location; - } else { - // join paths such that the leaf is replaced with `location`. - let strip_last_child = Path::new(&target.path_and_query) - .ancestors() - .nth(1) - .unwrap_or_else(|| Path::new("/")); - target.path_and_query = strip_last_child - .join(location) - .to_str() - .expect("valid UTF-8 checked by Url::parse; qed") - .to_string(); + break; } - std::mem::swap(&mut target.sockaddrs, &mut used_sockaddrs); - } - Err(e) => { - err = Some(Err(WsHandshakeError::Url(e.to_string().into()))); - } - }; - } - Err(e) => { - err = Some(Err(e.into())); - } - }; + Err(e) => { + err = Some(Err(WsHandshakeError::Url(e.to_string().into()))); + } + }; + } + Err(e) => { + err = Some(Err(e.into())); + } + }; + } } err.unwrap_or(Err(WsHandshakeError::NoAddressFound(target.host))) } From efe1c28a1586975e7af8cf992455613dd4228c23 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 2 Oct 2021 18:54:24 +0200 Subject: [PATCH 26/34] revert faulty change. Relative reference must start with either `/` or `//` --- test-utils/Cargo.toml | 1 - test-utils/src/mocks.rs | 48 +++++++++++++++++++++++------------------ 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 6a8885c1ad..3f159251a0 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -16,6 +16,5 @@ log = "0.4" serde = { version = "1", default-features = false, features = ["derive"] } serde_json = "1" soketto = { version = "0.7", features = ["http"] } -path-slash = "0.1.4" tokio = { version = "1", features = ["net", "rt-multi-thread", "macros", "time"] } tokio-util = { version = "0.6", features = ["compat"] } \ No newline at end of file diff --git a/test-utils/src/mocks.rs b/test-utils/src/mocks.rs index f7c8f8dfab..19ce908016 100644 --- a/test-utils/src/mocks.rs +++ b/test-utils/src/mocks.rs @@ -33,10 +33,9 @@ use futures_util::{ sink::SinkExt, stream::{self, StreamExt}, }; -use path_slash::PathBufExt; use serde::{Deserialize, Serialize}; use soketto::handshake::{self, http::is_upgrade_request, server::Response, Error as SokettoError, Server}; -use std::{io, net::SocketAddr, path::PathBuf, time::Duration}; +use std::{io, net::SocketAddr, path::Path, time::Duration}; use tokio::net::TcpStream; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; @@ -341,25 +340,32 @@ async fn handler( other_server: String, ) -> Result, soketto::BoxedError> { if is_upgrade_request(&req) { - log::error!("{:?}", req.uri().path()); - let path = PathBuf::from_slash(req.uri().path()); - - if path == PathBuf::from_slash("/myblock/two") { - let response = - hyper::Response::builder().status(301).header("Location", other_server).body(Body::empty()).unwrap(); - Ok(response) - } else if path == PathBuf::from_slash("/myblock/one") { - let response = - hyper::Response::builder().status(301).header("Location", "two").body(Body::empty()).unwrap(); - Ok(response) - } else { - let path = PathBuf::from_slash("/myblock/one"); - let response = hyper::Response::builder() - .status(301) - .header("Location", path.to_str().expect("valid utf8 must be used")) - .body(Body::empty()) - .unwrap(); - Ok(response) + let path = req.uri().path(); + log::error!("{:?}", path); + + match path { + "/myblock/two" => { + let response = hyper::Response::builder() + .status(301) + .header("Location", other_server) + .body(Body::empty()) + .unwrap(); + Ok(response) + } + "/myblock/one" => { + let response = + hyper::Response::builder().status(301).header("Location", "two").body(Body::empty()).unwrap(); + Ok(response) + } + _ => { + let path = Path::new("/myblock/one"); + let response = hyper::Response::builder() + .status(301) + .header("Location", path.to_str().expect("valid utf8; checked above")) + .body(Body::empty()) + .unwrap(); + Ok(response) + } } } else { panic!("expect upgrade to WS"); From e70842b1eb3a2c4b9ebe2fcb92265a0c80837b9c Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 2 Oct 2021 19:35:02 +0200 Subject: [PATCH 27/34] revert windows path --- test-utils/src/mocks.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test-utils/src/mocks.rs b/test-utils/src/mocks.rs index 19ce908016..19fb5ebab7 100644 --- a/test-utils/src/mocks.rs +++ b/test-utils/src/mocks.rs @@ -358,10 +358,9 @@ async fn handler( Ok(response) } _ => { - let path = Path::new("/myblock/one"); let response = hyper::Response::builder() .status(301) - .header("Location", path.to_str().expect("valid utf8; checked above")) + .header("Location", "/myblock/one") .body(Body::empty()) .unwrap(); Ok(response) From 549e563c25b46bb21dd4fca96c631b08bd162232 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sun, 3 Oct 2021 13:22:21 +0200 Subject: [PATCH 28/34] use manual join paths --- test-utils/src/mocks.rs | 2 +- ws-client/Cargo.toml | 1 + ws-client/src/transport.rs | 73 ++++++++++++++++++++------------------ 3 files changed, 40 insertions(+), 36 deletions(-) diff --git a/test-utils/src/mocks.rs b/test-utils/src/mocks.rs index 19fb5ebab7..254944dda9 100644 --- a/test-utils/src/mocks.rs +++ b/test-utils/src/mocks.rs @@ -35,7 +35,7 @@ use futures_util::{ }; use serde::{Deserialize, Serialize}; use soketto::handshake::{self, http::is_upgrade_request, server::Response, Error as SokettoError, Server}; -use std::{io, net::SocketAddr, path::Path, time::Duration}; +use std::{io, net::SocketAddr, time::Duration}; use tokio::net::TcpStream; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; diff --git a/ws-client/Cargo.toml b/ws-client/Cargo.toml index 833de6231b..6f2c7ee058 100644 --- a/ws-client/Cargo.toml +++ b/ws-client/Cargo.toml @@ -25,6 +25,7 @@ soketto = "0.7" pin-project = "1" thiserror = "1" url = "2" +http = "0.2" rustls = "0.19.1" rustls-native-certs = "0.5.0" diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 1ab1244f0c..81e12b2b8e 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -26,9 +26,9 @@ use crate::{client::Header, stream::EitherStream}; use futures::io::{BufReader, BufWriter}; +use http::Uri; use soketto::connection; use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse}; -use std::path::Path; use std::{borrow::Cow, io, net::SocketAddr, sync::Arc, time::Duration}; use thiserror::Error; use tokio::net::TcpStream; @@ -238,44 +238,47 @@ impl<'a> WsTransportClientBuilder<'a> { } Ok(ServerResponse::Redirect { status_code, location }) => { log::error!("Redirection: status_code: {}, location: {}", status_code, location); - match url::Url::parse(&location) { + // TODO(niklasad1): should query params still be used after redirection?! + match location.parse::() { // redirection with absolute path => need to lookup. - Ok(url) => { - target = Target::parse(url)?; - tls_connector = match target.mode { - Mode::Tls => { - let mut client_config = rustls::ClientConfig::default(); - if let CertificateStore::Native = self.certificate_store { - client_config.root_store = rustls_native_certs::load_native_certs() - .map_err(|(_, e)| WsHandshakeError::CertificateStore(e))?; + Ok(uri) => { + // Absolute URI. + if uri.scheme().is_some() { + // TODO(niklasad1): this is duplicated work + target = Target::parse(&location)?; + tls_connector = match target.mode { + Mode::Tls => { + let mut client_config = rustls::ClientConfig::default(); + if let CertificateStore::Native = self.certificate_store { + client_config.root_store = rustls_native_certs::load_native_certs() + .map_err(|(_, e)| WsHandshakeError::CertificateStore(e))?; + } + Some(Arc::new(client_config).into()) } - Some(Arc::new(client_config).into()) + Mode::Plain => None, + }; + break; + } + // Relative URI. + else { + // Replace the entire path_and_query if `location` starts with `/` or `//`. + if location.starts_with('/') { + target.path_and_query = location; + } else { + match target.path_and_query.rfind("/") { + Some(offset) => { + target.path_and_query.replace_range(offset + 1.., &location) + } + None => { + err = Some(Err(WsHandshakeError::Url( + "URI relative reference must contain `/` or `//`".to_owned().into(), + ))); + continue; + } + }; } - Mode::Plain => None, - }; - break; - } - // redirection is relative, either `/baz` or `bar`. - Err( - url::ParseError::RelativeUrlWithoutBase - | url::ParseError::RelativeUrlWithCannotBeABaseBase, - ) => { - // replace the entire path if `location` is `/`. - if location.starts_with('/') { - target.path_and_query = location; - } else { - // join paths such that the leaf is replaced with `location`. - let strip_last_child = Path::new(&target.path_and_query) - .ancestors() - .nth(1) - .unwrap_or_else(|| Path::new("/")); - target.path_and_query = strip_last_child - .join(location) - .to_str() - .expect("valid UTF-8 checked by Url::parse; qed") - .to_string(); + break; } - break; } Err(e) => { err = Some(Err(WsHandshakeError::Url(e.to_string().into()))); From df28dbdbfd5c0c21dfb494de06cf8713e11ad5de Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sun, 3 Oct 2021 15:32:20 +0200 Subject: [PATCH 29/34] remove url dep --- ws-client/Cargo.toml | 1 - ws-client/src/client.rs | 17 ++++--- ws-client/src/tests.rs | 2 +- ws-client/src/transport.rs | 92 +++++++++++++++++++------------------- 4 files changed, 54 insertions(+), 58 deletions(-) diff --git a/ws-client/Cargo.toml b/ws-client/Cargo.toml index 6f2c7ee058..bf030a5704 100644 --- a/ws-client/Cargo.toml +++ b/ws-client/Cargo.toml @@ -24,7 +24,6 @@ serde_json = "1" soketto = "0.7" pin-project = "1" thiserror = "1" -url = "2" http = "0.2" rustls = "0.19.1" rustls-native-certs = "0.5.0" diff --git a/ws-client/src/client.rs b/ws-client/src/client.rs index 66276c08d9..d5c4afc7b5 100644 --- a/ws-client/src/client.rs +++ b/ws-client/src/client.rs @@ -24,7 +24,7 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::transport::{Receiver as WsReceiver, Sender as WsSender, Target, WsTransportClientBuilder}; +use crate::transport::{Receiver as WsReceiver, Sender as WsSender, WsHandshakeError, WsTransportClientBuilder}; use crate::types::{ traits::{Client, SubscriptionClient}, v2::{Id, Notification, NotificationSer, ParamsSer, RequestSer, Response, RpcError, SubscriptionResponse}, @@ -46,9 +46,11 @@ use futures::{ prelude::*, sink::SinkExt, }; +use http::uri::{InvalidUri, Uri}; use tokio::sync::Mutex; use serde::de::DeserializeOwned; +use std::convert::TryInto; use std::time::Duration; pub use soketto::handshake::client::Header; @@ -187,17 +189,12 @@ impl<'a> WsClientBuilder<'a> { } /// Build the client with specified URL to connect to. - /// If the port number is missing from the URL, the default port number is used. - /// - /// - /// `ws://host` - port 80 is used - /// - /// `wss://host` - port 443 is used + /// You must provide the port number in the URL. /// /// ## Panics /// /// Panics if being called outside of `tokio` runtime context. - pub async fn build(self, url: &'a str) -> Result { + pub async fn build(self, uri: &'a str) -> Result { let certificate_store = self.certificate_store; let max_capacity_per_subscription = self.max_notifs_per_subscription; let max_concurrent_requests = self.max_concurrent_requests; @@ -205,9 +202,11 @@ impl<'a> WsClientBuilder<'a> { let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests); let (err_tx, err_rx) = oneshot::channel(); + let uri: Uri = uri.parse().map_err(|e: InvalidUri| Error::Transport(e.into()))?; + let builder = WsTransportClientBuilder { certificate_store, - target: Target::parse(url).map_err(|e| Error::Transport(e.into()))?, + target: uri.try_into().map_err(|e: WsHandshakeError| Error::Transport(e.into()))?, timeout: self.connection_timeout, origin_header: self.origin_header, max_request_body_size: self.max_request_body_size, diff --git a/ws-client/src/tests.rs b/ws-client/src/tests.rs index 6394580505..d41b5081a1 100644 --- a/ws-client/src/tests.rs +++ b/ws-client/src/tests.rs @@ -267,7 +267,7 @@ fn assert_error_response(err: Error, exp: ErrorObject) { //#[cfg_attr(target_os = "windows", ignore)] #[tokio::test] async fn redirections() { - env_logger::try_init(); + let _ = env_logger::try_init(); let expected = "abc 123"; let server = WebSocketTestServer::with_hardcoded_response( "127.0.0.1:0".parse().unwrap(), diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 81e12b2b8e..7931ecc645 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -29,7 +29,15 @@ use futures::io::{BufReader, BufWriter}; use http::Uri; use soketto::connection; use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse}; -use std::{borrow::Cow, io, net::SocketAddr, sync::Arc, time::Duration}; +use std::convert::TryInto; +use std::{ + borrow::Cow, + convert::TryFrom, + io, + net::{SocketAddr, ToSocketAddrs}, + sync::Arc, + time::Duration, +}; use thiserror::Error; use tokio::net::TcpStream; use tokio_rustls::{ @@ -199,15 +207,13 @@ impl<'a> WsTransportClientBuilder<'a> { let mut err = None; for _ in 0..self.max_redirections { - // TODO(niklasad1): this should be debug. - log::error!("Connecting to target: {:?}", target); + log::debug!("Connecting to target: {:?}", target); for sockaddr in target.sockaddrs.clone() { let tcp_stream = match connect(sockaddr, self.timeout, &target.host, &tls_connector).await { Ok(stream) => stream, Err(e) => { - // TODO(niklasad1): this should be debug. - log::error!("Failed to connect to sockaddr: {:?}", sockaddr); + log::debug!("Failed to connect to sockaddr: {:?}", sockaddr); err = Some(Err(e)); continue; } @@ -223,8 +229,7 @@ impl<'a> WsTransportClientBuilder<'a> { // Perform the initial handshake. match client.handshake().await { Ok(ServerResponse::Accepted { .. }) => { - // TODO(niklasad1): this should be debug. - log::error!("Connection established to target: {:?}", target); + log::info!("Connection established to target: {:?}", target); let mut builder = client.into_builder(); builder.set_max_message_size(self.max_request_body_size as usize); let (sender, receiver) = builder.finish(); @@ -232,20 +237,17 @@ impl<'a> WsTransportClientBuilder<'a> { } Ok(ServerResponse::Rejected { status_code }) => { - // TODO(niklasad1): this should be debug. - log::error!("Connection rejected: {:?}", status_code); + log::debug!("Connection rejected: {:?}", status_code); err = Some(Err(WsHandshakeError::Rejected { status_code })); } Ok(ServerResponse::Redirect { status_code, location }) => { - log::error!("Redirection: status_code: {}, location: {}", status_code, location); - // TODO(niklasad1): should query params still be used after redirection?! + log::debug!("Redirection: status_code: {}, location: {}", status_code, location); match location.parse::() { // redirection with absolute path => need to lookup. Ok(uri) => { // Absolute URI. if uri.scheme().is_some() { - // TODO(niklasad1): this is duplicated work - target = Target::parse(&location)?; + target = uri.try_into()?; tls_connector = match target.mode { Mode::Tls => { let mut client_config = rustls::ClientConfig::default(); @@ -361,34 +363,32 @@ pub struct Target { path_and_query: String, } -impl Target { - /// Parse an URL String to a WebSocket address. - pub fn parse(url: impl AsRef) -> Result { - let url = - url::Url::parse(url.as_ref()).map_err(|e| WsHandshakeError::Url(format!("Invalid URL: {}", e).into()))?; - let mode = match url.scheme() { - "ws" => Mode::Plain, - "wss" => Mode::Tls, +impl TryFrom for Target { + type Error = WsHandshakeError; + + fn try_from(uri: Uri) -> Result { + let mode = match uri.scheme_str() { + Some("ws") => Mode::Plain, + Some("wss") => Mode::Tls, _ => return Err(WsHandshakeError::Url("URL scheme not supported, expects 'ws' or 'wss'".into())), }; - let host = - url.host_str().map(ToOwned::to_owned).ok_or_else(|| WsHandshakeError::Url("No host in URL".into()))?; - let port = url.port_or_known_default().ok_or_else(|| WsHandshakeError::Url("No port number in URL".into()))?; + let host = uri.host().map(ToOwned::to_owned).ok_or_else(|| WsHandshakeError::Url("No host in URL".into()))?; + let port = uri + .port_u16() + .ok_or_else(|| WsHandshakeError::Url("No port number in URL (default port is not supported)".into()))?; let host_header = format!("{}:{}", host, port); - let mut path_and_query = url.path().to_owned(); - if let Some(query) = url.query() { - path_and_query.push('?'); - path_and_query.push_str(query); - } - // NOTE: `Url::socket_addrs` is using the default port if it's missing (ws:// - 80, wss:// - 443) - let sockaddrs = url.socket_addrs(|| None).map_err(WsHandshakeError::ResolutionFailed)?; - Ok(Self { sockaddrs, host, host_header, mode, path_and_query }) + let parts = uri.into_parts(); + let path_and_query = parts.path_and_query.ok_or_else(|| WsHandshakeError::Url("No path in URL".into()))?; + let sockaddrs = host_header.to_socket_addrs().map_err(WsHandshakeError::ResolutionFailed)?; + Ok(Self { sockaddrs: sockaddrs.collect(), host, host_header, mode, path_and_query: path_and_query.to_string() }) } } #[cfg(test)] mod tests { - use super::{Mode, Target, WsHandshakeError}; + use super::{Mode, Target, Uri, WsHandshakeError}; + use http::uri::InvalidUri; + use std::convert::TryInto; fn assert_ws_target(target: Target, host: &str, host_header: &str, mode: Mode, path_and_query: &str) { assert_eq!(&target.host, host); @@ -397,53 +397,51 @@ mod tests { assert_eq!(&target.path_and_query, path_and_query); } + fn parse_target(uri: &str) -> Result { + uri.parse::().map_err(|e: InvalidUri| WsHandshakeError::Url(e.to_string().into()))?.try_into() + } + #[test] fn ws_works() { - let target = Target::parse("ws://127.0.0.1:9933").unwrap(); + let target = parse_target("ws://127.0.0.1:9933").unwrap(); assert_ws_target(target, "127.0.0.1", "127.0.0.1:9933", Mode::Plain, "/"); } #[test] fn wss_works() { - let target = Target::parse("wss://kusama-rpc.polkadot.io:443").unwrap(); + let target = parse_target("wss://kusama-rpc.polkadot.io:443").unwrap(); assert_ws_target(target, "kusama-rpc.polkadot.io", "kusama-rpc.polkadot.io:443", Mode::Tls, "/"); } #[test] fn faulty_url_scheme() { - let err = Target::parse("http://kusama-rpc.polkadot.io:443").unwrap_err(); + let err = parse_target("http://kusama-rpc.polkadot.io:443").unwrap_err(); assert!(matches!(err, WsHandshakeError::Url(_))); } #[test] fn faulty_port() { - let err = Target::parse("ws://127.0.0.1:-43").unwrap_err(); + let err = parse_target("ws://127.0.0.1:-43").unwrap_err(); assert!(matches!(err, WsHandshakeError::Url(_))); - let err = Target::parse("ws://127.0.0.1:99999").unwrap_err(); + let err = parse_target("ws://127.0.0.1:99999").unwrap_err(); assert!(matches!(err, WsHandshakeError::Url(_))); } - #[test] - fn default_port_works() { - let target = Target::parse("ws://127.0.0.1").unwrap(); - assert_ws_target(target, "127.0.0.1", "127.0.0.1:80", Mode::Plain, "/"); - } - #[test] fn url_with_path_works() { - let target = Target::parse("wss://127.0.0.1/my-special-path").unwrap(); + let target = parse_target("wss://127.0.0.1:443/my-special-path").unwrap(); assert_ws_target(target, "127.0.0.1", "127.0.0.1:443", Mode::Tls, "/my-special-path"); } #[test] fn url_with_query_works() { - let target = Target::parse("wss://127.0.0.1/my?name1=value1&name2=value2").unwrap(); + let target = parse_target("wss://127.0.0.1:443/my?name1=value1&name2=value2").unwrap(); assert_ws_target(target, "127.0.0.1", "127.0.0.1:443", Mode::Tls, "/my?name1=value1&name2=value2"); } #[test] fn url_with_fragment_is_ignored() { - let target = Target::parse("wss://127.0.0.1/my.htm#ignore").unwrap(); + let target = parse_target("wss://127.0.0.1:443/my.htm#ignore").unwrap(); assert_ws_target(target, "127.0.0.1", "127.0.0.1:443", Mode::Tls, "/my.htm"); } } From 4b5065cf2ee518f02b69abf60c86c1cd0ad041c3 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sun, 3 Oct 2021 18:47:05 +0200 Subject: [PATCH 30/34] Update ws-client/src/tests.rs --- ws-client/src/tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/ws-client/src/tests.rs b/ws-client/src/tests.rs index d41b5081a1..b2b0261826 100644 --- a/ws-client/src/tests.rs +++ b/ws-client/src/tests.rs @@ -264,7 +264,6 @@ fn assert_error_response(err: Error, exp: ErrorObject) { }; } -//#[cfg_attr(target_os = "windows", ignore)] #[tokio::test] async fn redirections() { let _ = env_logger::try_init(); From 93d0c3bc7c949bb818711d86b2484bea8bbe0438 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 4 Oct 2021 20:21:03 +0200 Subject: [PATCH 31/34] default max redirects 5 --- ws-client/src/client.rs | 2 +- ws-client/src/transport.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ws-client/src/client.rs b/ws-client/src/client.rs index d5c4afc7b5..8651a2ee78 100644 --- a/ws-client/src/client.rs +++ b/ws-client/src/client.rs @@ -126,7 +126,7 @@ impl<'a> Default for WsClientBuilder<'a> { origin_header: None, max_concurrent_requests: 256, max_notifs_per_subscription: 1024, - max_redirections: 10, + max_redirections: 5, } } } diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 7931ecc645..3a8fd7dfe6 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -267,7 +267,7 @@ impl<'a> WsTransportClientBuilder<'a> { if location.starts_with('/') { target.path_and_query = location; } else { - match target.path_and_query.rfind("/") { + match target.path_and_query.rfind('/') { Some(offset) => { target.path_and_query.replace_range(offset + 1.., &location) } From 0488a2a6180bced1f2dc67524b850eb7dae379eb Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 5 Oct 2021 11:43:34 +0200 Subject: [PATCH 32/34] remove needless clone vec --- test-utils/src/mocks.rs | 5 ++--- ws-client/src/transport.rs | 13 ++++++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/test-utils/src/mocks.rs b/test-utils/src/mocks.rs index 254944dda9..c3bb183ba0 100644 --- a/test-utils/src/mocks.rs +++ b/test-utils/src/mocks.rs @@ -340,10 +340,9 @@ async fn handler( other_server: String, ) -> Result, soketto::BoxedError> { if is_upgrade_request(&req) { - let path = req.uri().path(); - log::error!("{:?}", path); + log::debug!("{:?}", req); - match path { + match req.uri().path() { "/myblock/two" => { let response = hyper::Response::builder() .status(301) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index 3a8fd7dfe6..80602607d5 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -209,8 +209,10 @@ impl<'a> WsTransportClientBuilder<'a> { for _ in 0..self.max_redirections { log::debug!("Connecting to target: {:?}", target); - for sockaddr in target.sockaddrs.clone() { - let tcp_stream = match connect(sockaddr, self.timeout, &target.host, &tls_connector).await { + // The sockaddrs might get reused if the server replies with a URI relative resource. + let sockaddrs = std::mem::take(&mut target.sockaddrs); + for sockaddr in &sockaddrs { + let tcp_stream = match connect(*sockaddr, self.timeout, &target.host, &tls_connector).await { Ok(stream) => stream, Err(e) => { log::debug!("Failed to connect to sockaddr: {:?}", sockaddr); @@ -273,12 +275,17 @@ impl<'a> WsTransportClientBuilder<'a> { } None => { err = Some(Err(WsHandshakeError::Url( - "URI relative reference must contain `/` or `//`".to_owned().into(), + format!( + "path_and_query: {}; this is a bug it must contain `/` please open issue", + location + ) + .into(), ))); continue; } }; } + target.sockaddrs = sockaddrs; break; } } From 223c37cfc757d4885381e7d541f6dc3a2dabcb52 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 5 Oct 2021 16:44:26 +0200 Subject: [PATCH 33/34] fix bad merge --- ws-client/src/transport.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index dae7fae537..a5c3b4a484 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -258,7 +258,7 @@ impl<'a> WsTransportClientBuilder<'a> { target = uri.try_into()?; tls_connector = match target.mode { Mode::Tls => { - let mut client_config = rustls::ClientConfig::default(); + let mut client_config = ClientConfig::default(); if let CertificateStore::Native = self.certificate_store { client_config.root_store = rustls_native_certs::load_native_certs() .map_err(|(_, e)| WsHandshakeError::CertificateStore(e))?; From ae1e40551370e9cc85c210ae9c3d3256cfc545c6 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 5 Oct 2021 16:52:26 +0200 Subject: [PATCH 34/34] cmon CI run --- ws-client/src/transport.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ws-client/src/transport.rs b/ws-client/src/transport.rs index a5c3b4a484..e18f34229f 100644 --- a/ws-client/src/transport.rs +++ b/ws-client/src/transport.rs @@ -215,7 +215,7 @@ impl<'a> WsTransportClientBuilder<'a> { for _ in 0..self.max_redirections { log::debug!("Connecting to target: {:?}", target); - // The sockaddrs might get reused if the server replies with a URI relative resource. + // The sockaddrs might get reused if the server replies with a relative URI. let sockaddrs = std::mem::take(&mut target.sockaddrs); for sockaddr in &sockaddrs { let tcp_stream = match connect(*sockaddr, self.timeout, &target.host, &tls_connector).await {