Skip to content

Commit

Permalink
test(engineio): mock http/ws connections and improve integration tests (
Browse files Browse the repository at this point in the history
#447)

* test(engineio): mock http/ws connections

* test(engineio): fix rx to stream adapter
  • Loading branch information
Totodore authored Jan 20, 2025
1 parent f486c7a commit 1dc49e5
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 87 deletions.
3 changes: 2 additions & 1 deletion crates/engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ tracing-subscriber.workspace = true
hyper = { workspace = true, features = ["server", "http1"] }
criterion.workspace = true
axum.workspace = true
hyper-util = { workspace = true, features = ["tokio", "client-legacy"] }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io"], default-features = false }

[features]
v3 = ["memchr", "unicode-segmentation", "itoa"]
Expand Down
23 changes: 23 additions & 0 deletions crates/engineioxide/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,29 @@ where
}
}

#[cfg(feature = "__test_harness")]
#[doc(hidden)]
impl<H, Svc> EngineIoService<H, Svc>
where
H: EngineIoHandler,
{
/// Create a new engine.io conn over websocket through a raw stream.
/// Mostly used for testing.
pub fn ws_init<S>(
&self,
conn: S,
protocol: ProtocolVersion,
sid: Option<crate::sid::Sid>,
req_data: http::request::Parts,
) -> impl std::future::Future<Output = Result<(), crate::errors::Error>>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
let engine = self.engine.clone();
crate::transport::ws::on_init(engine, conn, protocol, sid, req_data)
}
}

/// A MakeService that always returns a clone of the [`EngineIoService`] it was created with.
pub struct MakeEngineIoService<H: EngineIoHandler, S> {
svc: EngineIoService<H, S>,
Expand Down
2 changes: 1 addition & 1 deletion crates/engineioxide/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub fn new_req<R: Send + 'static, B, H: EngineIoHandler>(
/// Sends an open packet if it is not an upgrade from a polling request
///
/// Read packets from the websocket and handle them, it will block until the connection is closed
async fn on_init<H: EngineIoHandler, S>(
pub async fn on_init<H: EngineIoHandler, S>(
engine: Arc<EngineIo<H>>,
conn: S,
protocol: ProtocolVersion,
Expand Down
42 changes: 21 additions & 21 deletions crates/engineioxide/tests/disconnect_reason.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use tokio::sync::mpsc;

mod fixture;

use fixture::{create_server, send_req};
use fixture::{create_server, create_ws_connection, send_req};
use tokio_tungstenite::tungstenite::Message;

use crate::fixture::{create_polling_connection, create_ws_connection};
use crate::fixture::create_polling_connection;

#[derive(Debug, Clone)]
struct MyHandler {
Expand Down Expand Up @@ -53,8 +53,8 @@ impl EngineIoHandler for MyHandler {
#[tokio::test]
pub async fn polling_heartbeat_timeout() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1234).await;
create_polling_connection(1234).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
create_polling_connection(&mut svc).await;

let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
Expand All @@ -67,8 +67,8 @@ pub async fn polling_heartbeat_timeout() {
#[tokio::test]
pub async fn ws_heartbeat_timeout() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 12344).await;
let _stream = create_ws_connection(12344).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let _stream = create_ws_connection(&mut svc).await;

let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
Expand All @@ -81,11 +81,11 @@ pub async fn ws_heartbeat_timeout() {
#[tokio::test]
pub async fn polling_transport_closed() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1235).await;
let sid = create_polling_connection(1235).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let sid = create_polling_connection(&mut svc).await;

send_req(
1235,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::POST,
Some("1".into()),
Expand All @@ -103,8 +103,8 @@ pub async fn polling_transport_closed() {
#[tokio::test]
pub async fn ws_transport_closed() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 12345).await;
let mut stream = create_ws_connection(12345).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let mut stream = create_ws_connection(&mut svc).await;

stream.send(Message::Text("1".into())).await.unwrap();

Expand All @@ -119,10 +119,10 @@ pub async fn ws_transport_closed() {
#[tokio::test]
pub async fn multiple_http_polling() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1236).await;
let sid = create_polling_connection(1236).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let sid = create_polling_connection(&mut svc).await;
send_req(
1236,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
Expand All @@ -131,13 +131,13 @@ pub async fn multiple_http_polling() {

tokio::spawn(futures_util::future::join_all(vec![
send_req(
1236,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
),
send_req(
1236,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
Expand All @@ -155,10 +155,10 @@ pub async fn multiple_http_polling() {
#[tokio::test]
pub async fn polling_packet_parsing() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 1237).await;
let sid = create_polling_connection(1237).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let sid = create_polling_connection(&mut svc).await;
send_req(
1237,
&mut svc,
format!("transport=polling&sid={sid}"),
http::Method::POST,
Some("aizdunazidaubdiz".into()),
Expand All @@ -176,8 +176,8 @@ pub async fn polling_packet_parsing() {
#[tokio::test]
pub async fn ws_packet_parsing() {
let (disconnect_tx, mut rx) = mpsc::channel(10);
create_server(MyHandler { disconnect_tx }, 12347).await;
let mut stream = create_ws_connection(12347).await;
let mut svc = create_server(MyHandler { disconnect_tx }).await;
let mut stream = create_ws_connection(&mut svc).await;
stream
.send(Message::Text("aizdunazidaubdiz".into()))
.await
Expand Down
Loading

0 comments on commit 1dc49e5

Please sign in to comment.