From f3eb4cdf350eb1f83b5ec478be93321631adad9a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 23 Nov 2023 20:40:39 +0200 Subject: [PATCH 01/10] lightclient: Close wasm socket while dropping from connecting state Signed-off-by: Alexandru Vasile --- lightclient/src/platform/wasm_socket.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightclient/src/platform/wasm_socket.rs b/lightclient/src/platform/wasm_socket.rs index 3d212a3506..1d2efbd23c 100644 --- a/lightclient/src/platform/wasm_socket.rs +++ b/lightclient/src/platform/wasm_socket.rs @@ -230,7 +230,7 @@ impl Drop for WasmSocket { fn drop(&mut self) { let inner = self.inner.lock().expect("Mutex is poised; qed"); - if inner.state == ConnectionState::Opened { + if inner.state == ConnectionState::Opened || inner.state == ConnectionState::Connecting { let _ = self.socket.close(); } } From cfac2831e274c1b4414ba9688de35f5cbcf0d57a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 24 Nov 2023 13:54:14 +0200 Subject: [PATCH 02/10] lightclient: Construct one time only closures Signed-off-by: Alexandru Vasile --- lightclient/src/platform/wasm_socket.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lightclient/src/platform/wasm_socket.rs b/lightclient/src/platform/wasm_socket.rs index 1d2efbd23c..81ac605fc0 100644 --- a/lightclient/src/platform/wasm_socket.rs +++ b/lightclient/src/platform/wasm_socket.rs @@ -63,10 +63,10 @@ struct InnerWasmSocket { /// /// These need to be kept around until the socket is dropped. type Callbacks = ( - Closure, + JsValue, Closure, - Closure, - Closure, + JsValue, + JsValue, ); impl WasmSocket { @@ -89,7 +89,7 @@ impl WasmSocket { waker: None, })); - let open_callback = Closure::::new({ + let open_callback = Closure::once_into_js({ let inner = inner.clone(); move || { let mut inner = inner.lock().expect("Mutex is poised; qed"); @@ -120,9 +120,9 @@ impl WasmSocket { }); socket.set_onmessage(Some(message_callback.as_ref().unchecked_ref())); - let error_callback = Closure::::new({ + let error_callback = Closure::once_into_js({ let inner = inner.clone(); - move |_| { + move |_event: web_sys::Event| { // Callback does not provide useful information, signal it back to the stream. let mut inner = inner.lock().expect("Mutex is poised; qed"); inner.state = ConnectionState::Error; @@ -134,9 +134,9 @@ impl WasmSocket { }); socket.set_onerror(Some(error_callback.as_ref().unchecked_ref())); - let close_callback = Closure::::new({ + let close_callback = Closure::once_into_js({ let inner = inner.clone(); - move |_| { + move |_event: web_sys::CloseEvent| { let mut inner = inner.lock().expect("Mutex is poised; qed"); inner.state = ConnectionState::Closed; From 860f0e006f161b104fd038a335bce84d7dbe1197 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 24 Nov 2023 13:54:33 +0200 Subject: [PATCH 03/10] testing: Enable console logs for lightclient WASM testing Signed-off-by: Alexandru Vasile --- testing/wasm-lightclient-tests/tests/wasm.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testing/wasm-lightclient-tests/tests/wasm.rs b/testing/wasm-lightclient-tests/tests/wasm.rs index ed3ee77a70..04018babeb 100644 --- a/testing/wasm-lightclient-tests/tests/wasm.rs +++ b/testing/wasm-lightclient-tests/tests/wasm.rs @@ -32,6 +32,8 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); #[wasm_bindgen_test] async fn light_client_works() { + console_error_panic_hook::set_once(); + let api: LightClient = LightClientBuilder::new() .build_from_url("wss://rpc.polkadot.io:443") .await From 866094001d4c0b119a80ed681a74b323f74eae1b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 24 Nov 2023 14:41:21 +0200 Subject: [PATCH 04/10] lightclient: Separate wakes and check connectivity on poll_read Signed-off-by: Alexandru Vasile --- lightclient/src/platform/wasm_socket.rs | 53 ++++++++++++++++++------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/lightclient/src/platform/wasm_socket.rs b/lightclient/src/platform/wasm_socket.rs index 81ac605fc0..35eec32962 100644 --- a/lightclient/src/platform/wasm_socket.rs +++ b/lightclient/src/platform/wasm_socket.rs @@ -55,8 +55,15 @@ struct InnerWasmSocket { state: ConnectionState, /// Data buffer for the socket. data: VecDeque, - /// Waker from `poll_read` / `poll_write`. - waker: Option, + + /// Waker from `poll_read` when the socket is not connected yet. + open_waker: Option, + /// Waker from `poll_read`. + read_waker: Option, + /// Waker from `poll_write` and `poll_flush`. + write_waker: Option, + /// Waker from `poll_close`. + close_waker: Option, } /// Registered callbacks of the [`WasmSocket`]. @@ -86,7 +93,10 @@ impl WasmSocket { let inner = Arc::new(Mutex::new(InnerWasmSocket { state: ConnectionState::Connecting, data: VecDeque::with_capacity(16384), - waker: None, + open_waker: None, + read_waker: None, + write_waker: None, + close_waker: None, })); let open_callback = Closure::once_into_js({ @@ -95,7 +105,7 @@ impl WasmSocket { let mut inner = inner.lock().expect("Mutex is poised; qed"); inner.state = ConnectionState::Opened; - if let Some(waker) = inner.waker.take() { + if let Some(waker) = inner.open_waker.take() { waker.wake(); } } @@ -113,7 +123,7 @@ impl WasmSocket { let bytes = js_sys::Uint8Array::new(&buffer).to_vec(); inner.data.extend(bytes.into_iter()); - if let Some(waker) = inner.waker.take() { + if let Some(waker) = inner.read_waker.take() { waker.wake(); } } @@ -126,10 +136,6 @@ impl WasmSocket { // Callback does not provide useful information, signal it back to the stream. let mut inner = inner.lock().expect("Mutex is poised; qed"); inner.state = ConnectionState::Error; - - if let Some(waker) = inner.waker.take() { - waker.wake(); - } } }); socket.set_onerror(Some(error_callback.as_ref().unchecked_ref())); @@ -140,7 +146,7 @@ impl WasmSocket { let mut inner = inner.lock().expect("Mutex is poised; qed"); inner.state = ConnectionState::Closed; - if let Some(waker) = inner.waker.take() { + if let Some(waker) = inner.close_waker.take() { waker.wake(); } } @@ -169,8 +175,17 @@ impl AsyncRead for WasmSocket { buf: &mut [u8], ) -> Poll> { let mut inner = self.inner.lock().expect("Mutex is poised; qed"); - inner.waker = Some(cx.waker().clone()); + // Check if the socket is ready for reading. + let state = self.socket.ready_state(); + if state == web_sys::WebSocket::CLOSED || state == web_sys::WebSocket::CLOSING { + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())); + } else if state == web_sys::WebSocket::CONNECTING { + inner.open_waker = Some(cx.waker().clone()); + return Poll::Pending; + } + + inner.read_waker = Some(cx.waker().clone()); match inner.state { ConnectionState::Error => { Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Socket error"))) @@ -199,7 +214,7 @@ impl AsyncWrite for WasmSocket { buf: &[u8], ) -> Poll> { let mut inner = self.inner.lock().expect("Mutex is poised; qed"); - inner.waker = Some(cx.waker().clone()); + inner.write_waker = Some(cx.waker().clone()); match inner.state { ConnectionState::Error => { @@ -221,8 +236,18 @@ impl AsyncWrite for WasmSocket { Poll::Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.socket.ready_state() == web_sys::WebSocket::CLOSED { + return Poll::Ready(Ok(())); + } + + if self.socket.ready_state() != web_sys::WebSocket::CLOSING { + let _ = self.socket.close(); + } + + let mut inner = self.inner.lock().expect("Mutex is poised; qed"); + inner.close_waker = Some(cx.waker().clone()); + Poll::Pending } } From b2e6d4108c1a6d471a1388c4dca1ae590f93f159 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 24 Nov 2023 14:42:08 +0200 Subject: [PATCH 05/10] lightclient: Close the socket depending on internal state Signed-off-by: Alexandru Vasile --- lightclient/src/platform/wasm_socket.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lightclient/src/platform/wasm_socket.rs b/lightclient/src/platform/wasm_socket.rs index 35eec32962..816d5e8fa1 100644 --- a/lightclient/src/platform/wasm_socket.rs +++ b/lightclient/src/platform/wasm_socket.rs @@ -253,9 +253,7 @@ impl AsyncWrite for WasmSocket { impl Drop for WasmSocket { fn drop(&mut self) { - let inner = self.inner.lock().expect("Mutex is poised; qed"); - - if inner.state == ConnectionState::Opened || inner.state == ConnectionState::Connecting { + if self.socket.ready_state() == web_sys::WebSocket::OPEN { let _ = self.socket.close(); } } From 78f4d88194083a7b5855f7a26d4a434bca20a9e3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 24 Nov 2023 14:56:12 +0200 Subject: [PATCH 06/10] Revert "lightclient: Separate wakes and check connectivity on poll_read" This reverts commit 866094001d4c0b119a80ed681a74b323f74eae1b. --- lightclient/src/platform/wasm_socket.rs | 53 +++++++------------------ 1 file changed, 14 insertions(+), 39 deletions(-) diff --git a/lightclient/src/platform/wasm_socket.rs b/lightclient/src/platform/wasm_socket.rs index 816d5e8fa1..2abeb0003b 100644 --- a/lightclient/src/platform/wasm_socket.rs +++ b/lightclient/src/platform/wasm_socket.rs @@ -55,15 +55,8 @@ struct InnerWasmSocket { state: ConnectionState, /// Data buffer for the socket. data: VecDeque, - - /// Waker from `poll_read` when the socket is not connected yet. - open_waker: Option, - /// Waker from `poll_read`. - read_waker: Option, - /// Waker from `poll_write` and `poll_flush`. - write_waker: Option, - /// Waker from `poll_close`. - close_waker: Option, + /// Waker from `poll_read` / `poll_write`. + waker: Option, } /// Registered callbacks of the [`WasmSocket`]. @@ -93,10 +86,7 @@ impl WasmSocket { let inner = Arc::new(Mutex::new(InnerWasmSocket { state: ConnectionState::Connecting, data: VecDeque::with_capacity(16384), - open_waker: None, - read_waker: None, - write_waker: None, - close_waker: None, + waker: None, })); let open_callback = Closure::once_into_js({ @@ -105,7 +95,7 @@ impl WasmSocket { let mut inner = inner.lock().expect("Mutex is poised; qed"); inner.state = ConnectionState::Opened; - if let Some(waker) = inner.open_waker.take() { + if let Some(waker) = inner.waker.take() { waker.wake(); } } @@ -123,7 +113,7 @@ impl WasmSocket { let bytes = js_sys::Uint8Array::new(&buffer).to_vec(); inner.data.extend(bytes.into_iter()); - if let Some(waker) = inner.read_waker.take() { + if let Some(waker) = inner.waker.take() { waker.wake(); } } @@ -136,6 +126,10 @@ impl WasmSocket { // Callback does not provide useful information, signal it back to the stream. let mut inner = inner.lock().expect("Mutex is poised; qed"); inner.state = ConnectionState::Error; + + if let Some(waker) = inner.waker.take() { + waker.wake(); + } } }); socket.set_onerror(Some(error_callback.as_ref().unchecked_ref())); @@ -146,7 +140,7 @@ impl WasmSocket { let mut inner = inner.lock().expect("Mutex is poised; qed"); inner.state = ConnectionState::Closed; - if let Some(waker) = inner.close_waker.take() { + if let Some(waker) = inner.waker.take() { waker.wake(); } } @@ -175,17 +169,8 @@ impl AsyncRead for WasmSocket { buf: &mut [u8], ) -> Poll> { let mut inner = self.inner.lock().expect("Mutex is poised; qed"); + inner.waker = Some(cx.waker().clone()); - // Check if the socket is ready for reading. - let state = self.socket.ready_state(); - if state == web_sys::WebSocket::CLOSED || state == web_sys::WebSocket::CLOSING { - return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())); - } else if state == web_sys::WebSocket::CONNECTING { - inner.open_waker = Some(cx.waker().clone()); - return Poll::Pending; - } - - inner.read_waker = Some(cx.waker().clone()); match inner.state { ConnectionState::Error => { Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Socket error"))) @@ -214,7 +199,7 @@ impl AsyncWrite for WasmSocket { buf: &[u8], ) -> Poll> { let mut inner = self.inner.lock().expect("Mutex is poised; qed"); - inner.write_waker = Some(cx.waker().clone()); + inner.waker = Some(cx.waker().clone()); match inner.state { ConnectionState::Error => { @@ -236,18 +221,8 @@ impl AsyncWrite for WasmSocket { Poll::Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.socket.ready_state() == web_sys::WebSocket::CLOSED { - return Poll::Ready(Ok(())); - } - - if self.socket.ready_state() != web_sys::WebSocket::CLOSING { - let _ = self.socket.close(); - } - - let mut inner = self.inner.lock().expect("Mutex is poised; qed"); - inner.close_waker = Some(cx.waker().clone()); - Poll::Pending + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } } From 1cdee8f7c1822688206afd3dba4bf169af2896b6 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 24 Nov 2023 14:58:56 +0200 Subject: [PATCH 07/10] lightclient: Return pending if socket is opening from poll_read Signed-off-by: Alexandru Vasile --- lightclient/src/platform/wasm_socket.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lightclient/src/platform/wasm_socket.rs b/lightclient/src/platform/wasm_socket.rs index 2abeb0003b..657eb86cde 100644 --- a/lightclient/src/platform/wasm_socket.rs +++ b/lightclient/src/platform/wasm_socket.rs @@ -171,6 +171,10 @@ impl AsyncRead for WasmSocket { let mut inner = self.inner.lock().expect("Mutex is poised; qed"); inner.waker = Some(cx.waker().clone()); + if self.socket.ready_state() == web_sys::WebSocket::Connecting { + return Poll::Pending; + } + match inner.state { ConnectionState::Error => { Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Socket error"))) From bd138797b3f76fc5e1d7c252719c14da44347e6f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 24 Nov 2023 15:00:29 +0200 Subject: [PATCH 08/10] lightclient: Close the socket on `poll_close` Signed-off-by: Alexandru Vasile --- lightclient/src/platform/wasm_socket.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/lightclient/src/platform/wasm_socket.rs b/lightclient/src/platform/wasm_socket.rs index 657eb86cde..d4c94694cb 100644 --- a/lightclient/src/platform/wasm_socket.rs +++ b/lightclient/src/platform/wasm_socket.rs @@ -171,7 +171,7 @@ impl AsyncRead for WasmSocket { let mut inner = self.inner.lock().expect("Mutex is poised; qed"); inner.waker = Some(cx.waker().clone()); - if self.socket.ready_state() == web_sys::WebSocket::Connecting { + if self.socket.ready_state() == web_sys::WebSocket::CONNECTING { return Poll::Pending; } @@ -225,8 +225,18 @@ impl AsyncWrite for WasmSocket { Poll::Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.socket.ready_state() == web_sys::WebSocket::CLOSED { + return Poll::Ready(Ok(())); + } + + if self.socket.ready_state() != web_sys::WebSocket::CLOSING { + let _ = self.socket.close(); + } + + let mut inner = self.inner.lock().expect("Mutex is poised; qed"); + inner.waker = Some(cx.waker().clone()); + Poll::Pending } } From 099f6ee1a913eb52b0e4ff41be0e5b3b85abca48 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 27 Nov 2023 14:59:25 +0200 Subject: [PATCH 09/10] lightclient: Reset closures on Drop to avoid recursive invokation Signed-off-by: Alexandru Vasile --- lightclient/src/platform/wasm_socket.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/lightclient/src/platform/wasm_socket.rs b/lightclient/src/platform/wasm_socket.rs index d4c94694cb..b189ded963 100644 --- a/lightclient/src/platform/wasm_socket.rs +++ b/lightclient/src/platform/wasm_socket.rs @@ -63,10 +63,10 @@ struct InnerWasmSocket { /// /// These need to be kept around until the socket is dropped. type Callbacks = ( - JsValue, + Closure, Closure, - JsValue, - JsValue, + Closure, + Closure, ); impl WasmSocket { @@ -89,7 +89,7 @@ impl WasmSocket { waker: None, })); - let open_callback = Closure::once_into_js({ + let open_callback = Closure::::new({ let inner = inner.clone(); move || { let mut inner = inner.lock().expect("Mutex is poised; qed"); @@ -120,7 +120,7 @@ impl WasmSocket { }); socket.set_onmessage(Some(message_callback.as_ref().unchecked_ref())); - let error_callback = Closure::once_into_js({ + let error_callback = Closure::::new({ let inner = inner.clone(); move |_event: web_sys::Event| { // Callback does not provide useful information, signal it back to the stream. @@ -134,7 +134,7 @@ impl WasmSocket { }); socket.set_onerror(Some(error_callback.as_ref().unchecked_ref())); - let close_callback = Closure::once_into_js({ + let close_callback = Closure::::new({ let inner = inner.clone(); move |_event: web_sys::CloseEvent| { let mut inner = inner.lock().expect("Mutex is poised; qed"); @@ -242,8 +242,15 @@ impl AsyncWrite for WasmSocket { impl Drop for WasmSocket { fn drop(&mut self) { - if self.socket.ready_state() == web_sys::WebSocket::OPEN { + if self.socket.ready_state() == web_sys::WebSocket::OPEN + || self.socket.ready_state() == web_sys::WebSocket::CONNECTING + { let _ = self.socket.close(); } + + self.socket.set_onopen(None); + self.socket.set_onmessage(None); + self.socket.set_onerror(None); + self.socket.set_onclose(None); } } From abe77b51969a4cdc6fb74d13bc873d5aa517d2cd Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 27 Nov 2023 15:23:14 +0200 Subject: [PATCH 10/10] lightclient: Close the socket if not already closing Signed-off-by: Alexandru Vasile --- lightclient/src/platform/wasm_socket.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lightclient/src/platform/wasm_socket.rs b/lightclient/src/platform/wasm_socket.rs index b189ded963..abe67e7c2d 100644 --- a/lightclient/src/platform/wasm_socket.rs +++ b/lightclient/src/platform/wasm_socket.rs @@ -242,9 +242,7 @@ impl AsyncWrite for WasmSocket { impl Drop for WasmSocket { fn drop(&mut self) { - if self.socket.ready_state() == web_sys::WebSocket::OPEN - || self.socket.ready_state() == web_sys::WebSocket::CONNECTING - { + if self.socket.ready_state() != web_sys::WebSocket::CLOSING { let _ = self.socket.close(); }