-
Notifications
You must be signed in to change notification settings - Fork 530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cancelling Async
queue take
makes other take
hang
#3998
Comments
This definitely looks like a bug in |
Yeah, I was trying to debug, that's why it became so complicated. Minimization is something like: "foobar" in real {
val test = Queue.unboundedForAsync[IO, String].flatMap { q =>
for {
fibers <- IO.both(q.take.start, q.take.start)
(fib1, fib2) = fibers
_ <- IO.sleep(0.1.second)
_ <- IO.both(fib1.cancel, q.offer("foobar"))
s <- fib1.joinWith(IO.pure(""))
s <- if (s.nonEmpty) {
IO.pure(s)
} else {
fib2.joinWithNever
}
_ <- IO(s mustEqual "foobar")
} yield ()
}
test.parReplicateA_(10000).as(ok)
} (Except this doesn't fail, it hangs.) |
Does this only happen with the unbounded queue? Or does it also happen with bounded? |
Only unbounded. And also, there is an existing test for (more or less) the same thing. It just doesn't run for unbounded. See #4001, where I try to fix this in the same way as it already works for bounded. |
Btw, this "let's wake up someone, just in case" seems exactly like something which could be avoided by #3553. |
This test fails:
Observations:
unboundedForConcurrent
(instead ofunboundedForAsync
) fixes it.fib1.cancel.productR(q.offer(Item))
instead ofboth
(i.e., no race betweencancel
andoffer
) fixes it.HERE
; I think this means, that the item wasn't lost (fib2
received it), just the wakeup was lost, then the fallback offer waked it up.The text was updated successfully, but these errors were encountered: