Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ws server): close all subscription when the connection is closed #725

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ async fn ws_server_should_stop_subscription_after_client_drop() {
}

#[tokio::test]
async fn ws_server_cancels_stream_after_reset_conn() {
async fn ws_server_cancels_subscriptions_on_reset_conn() {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
Expand All @@ -393,34 +393,40 @@ async fn ws_server_cancels_stream_after_reset_conn() {
let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let server_url = format!("ws://{}", server.local_addr().unwrap());

let (tx, mut rx) = mpsc::channel(1);
let (tx, rx) = mpsc::channel(1);
let mut module = RpcModule::new(tx);

module
.register_subscription("subscribe_never_produce", "n", "unsubscribe_never_produce", |_, sink, mut tx| {
// create stream that doesn't produce items.
let stream = futures::stream::empty::<usize>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test was flaky I don't know what I was thinking.

Copy link
Member Author

@niklasad1 niklasad1 Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further, this could complete for notify_one successfully if the receiver from the connection is closed once the a new item is produced on the stream and is tried to be sent to the subscriber.

so really tricky test to but I added sleep for one hour so should be "okeyisch" really tricky to test this.

.register_subscription("subscribe_for_ever", "n", "unsubscribe_for_ever", |_, sink, mut tx| {
// Create stream that produce one item then sleeps for an hour.
let interval = interval(Duration::from_secs(60 * 60));
let stream = IntervalStream::new(interval).map(move |_| 0_usize);

tokio::spawn(async move {
sink.pipe_from_stream(stream).await.unwrap();
let send_back = Arc::make_mut(&mut tx);
send_back.feed(()).await.unwrap();
send_back.send(()).await.unwrap();
});

Ok(())
})
.unwrap();

server.start(module).unwrap();

let client = WsClientBuilder::default().build(&server_url).await.unwrap();
let _sub1: Subscription<usize> =
client.subscribe("subscribe_never_produce", None, "unsubscribe_never_produce").await.unwrap();
let _sub2: Subscription<usize> =
client.subscribe("subscribe_never_produce", None, "unsubscribe_never_produce").await.unwrap();
let mut subs = Vec::new();

for _ in 0..10 {
subs.push(client.subscribe::<usize>("subscribe_for_ever", None, "unsubscribe_for_ever").await.unwrap());
}

// terminate connection.
drop(client);
assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped");
assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped");

let rx_len = rx.take(10).fold(0, |acc, _| async move { acc + 1 }).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance that running drop(client); right after the subscribing might mean that we don't get all 10 items back from it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could a few milliseconds to send the WS close message but so not really is my understanding rx here is another channel where tx is encapsulated in the RpcModule so it's not the subscription stream

Then after connection is closed the actual subscriptions should be terminated using Notify::notify_waiters after that this messages and sent to channel above...


assert_eq!(rx_len, 10);
}

#[tokio::test]
Expand Down
3 changes: 1 addition & 2 deletions ws-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,7 @@ async fn background_task(
// Terminate connection and send close message.
let _ = sender.close().await;

// Force `conn_tx` to this async block and close it down
// when the connection closes to be on safe side.
// Notify all listeners and close down associated tasks.
close_notify_server_stop.notify_one();
});

Expand Down