-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ws server): close all subscription when the connection is closed #725
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -381,7 +381,7 @@ async fn ws_server_should_stop_subscription_after_client_drop() { | |
} | ||
|
||
#[tokio::test] | ||
async fn ws_server_cancels_stream_after_reset_conn() { | ||
async fn ws_server_cancels_subscriptions_on_reset_conn() { | ||
tracing_subscriber::FmtSubscriber::builder() | ||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) | ||
.try_init() | ||
|
@@ -393,34 +393,40 @@ async fn ws_server_cancels_stream_after_reset_conn() { | |
let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap(); | ||
let server_url = format!("ws://{}", server.local_addr().unwrap()); | ||
|
||
let (tx, mut rx) = mpsc::channel(1); | ||
let (tx, rx) = mpsc::channel(1); | ||
let mut module = RpcModule::new(tx); | ||
|
||
module | ||
.register_subscription("subscribe_never_produce", "n", "unsubscribe_never_produce", |_, sink, mut tx| { | ||
// create stream that doesn't produce items. | ||
let stream = futures::stream::empty::<usize>(); | ||
.register_subscription("subscribe_for_ever", "n", "unsubscribe_for_ever", |_, sink, mut tx| { | ||
// Create stream that produce one item then sleeps for an hour. | ||
let interval = interval(Duration::from_secs(60 * 60)); | ||
let stream = IntervalStream::new(interval).map(move |_| 0_usize); | ||
|
||
tokio::spawn(async move { | ||
sink.pipe_from_stream(stream).await.unwrap(); | ||
let send_back = Arc::make_mut(&mut tx); | ||
send_back.feed(()).await.unwrap(); | ||
send_back.send(()).await.unwrap(); | ||
}); | ||
|
||
Ok(()) | ||
}) | ||
.unwrap(); | ||
|
||
server.start(module).unwrap(); | ||
|
||
let client = WsClientBuilder::default().build(&server_url).await.unwrap(); | ||
let _sub1: Subscription<usize> = | ||
client.subscribe("subscribe_never_produce", None, "unsubscribe_never_produce").await.unwrap(); | ||
let _sub2: Subscription<usize> = | ||
client.subscribe("subscribe_never_produce", None, "unsubscribe_never_produce").await.unwrap(); | ||
let mut subs = Vec::new(); | ||
|
||
for _ in 0..10 { | ||
subs.push(client.subscribe::<usize>("subscribe_for_ever", None, "unsubscribe_for_ever").await.unwrap()); | ||
} | ||
|
||
// terminate connection. | ||
drop(client); | ||
assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped"); | ||
assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped"); | ||
|
||
let rx_len = rx.take(10).fold(0, |acc, _| async move { acc + 1 }).await; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a chance that running There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it could a few milliseconds to send the WS close message but so not really is my understanding Then after connection is closed the actual subscriptions should be terminated using |
||
|
||
assert_eq!(rx_len, 10); | ||
} | ||
|
||
#[tokio::test] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test was flaky I don't know what I was thinking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Further, this could complete for
notify_one
successfully if thereceiver
from the connection is closed once the a new item is produced on the stream and istried to be sent
to the subscriber.so really tricky test to but I added sleep for one hour so should be "okeyisch" really tricky to test this.