From 7c101698cda28079b89fd01218905c4446cd9cea Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Fri, 16 Feb 2024 01:23:07 +0100 Subject: [PATCH 1/2] Run existing cancelableTakeTests also for unbounded queue (they fail) --- tests/shared/src/test/scala/cats/effect/std/QueueSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/shared/src/test/scala/cats/effect/std/QueueSpec.scala b/tests/shared/src/test/scala/cats/effect/std/QueueSpec.scala index 12ec3dac1f..2dee60880b 100644 --- a/tests/shared/src/test/scala/cats/effect/std/QueueSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/QueueSpec.scala @@ -372,6 +372,7 @@ class UnboundedQueueSpec extends BaseSpec with QueueTests[Queue] { commonTests(_ => constructor, _.offer(_), _.tryOffer(_), _.take, _.tryTake, _.size) batchTakeTests(_ => constructor, _.offer(_), _.tryTakeN(_)) batchOfferTests(_ => constructor, _.tryOfferN(_), _.tryTakeN(_)) + cancelableTakeTests(_ => constructor, _.offer(_), _.take) } } From de8b5d4408bb20fb144c2a7bc6450562280a5d13 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Fri, 16 Feb 2024 01:38:48 +0100 Subject: [PATCH 2/2] Fix it like in bounded --- .../main/scala/cats/effect/std/Queue.scala | 92 ++++++++++--------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Queue.scala b/std/shared/src/main/scala/cats/effect/std/Queue.scala index e07a2c5b4a..ba8138e873 100644 --- a/std/shared/src/main/scala/cats/effect/std/Queue.scala +++ b/std/shared/src/main/scala/cats/effect/std/Queue.scala @@ -820,52 +820,58 @@ object Queue { val size: F[Int] = F.delay(buffer.size()) - val take: F[A] = F defer { - try { - // attempt to take from the buffer. if it's empty, this will raise an exception - F.pure(buffer.take()) - } catch { - case FailureSignal => - // buffer was empty - // capture the fact that our retry succeeded and the value we were able to take - var received = false - var result: A = null.asInstanceOf[A] - - // a latch to block until some offerer wakes us up - val wait = F.async[Unit] { k => - F delay { - // register ourselves as a listener for offers - val clear = takers.put(k) - - try { - // now that we're registered, retry the take - result = buffer.take() - - // it worked! clear out our listener - clear() - // we got a result, so received should be true now - received = true - - // complete our own callback. see notes in offer about raced redundant completion - k(EitherUnit) - - // we *might* have negated a notification by succeeding here - // unnecessary wake-ups are mostly harmless (only slight fairness loss) - notifyOne() - - // don't bother with a finalizer since we're already complete - None - } catch { - case FailureSignal => - // println(s"failed take size = ${buffer.size()}") - // our retry failed, we're registered as a listener, so suspend - Some(F.delay(clear())) + val take: F[A] = F uncancelable { poll => + F defer { + try { + // attempt to take from the buffer. if it's empty, this will raise an exception + F.pure(buffer.take()) + } catch { + case FailureSignal => + // buffer was empty + // capture the fact that our retry succeeded and the value we were able to take + var received = false + var result: A = null.asInstanceOf[A] + + // a latch to block until some offerer wakes us up + val wait = F.asyncCheckAttempt[Unit] { k => + F delay { + // register ourselves as a listener for offers + val clear = takers.put(k) + + try { + // now that we're registered, retry the take + result = buffer.take() + + // it worked! clear out our listener + clear() + // we got a result, so received should be true now + received = true + + // we *might* have negated a notification by succeeding here + // unnecessary wake-ups are mostly harmless (only slight fairness loss) + notifyOne() + + // don't bother with a finalizer since we're already complete + EitherUnit + } catch { + case FailureSignal => + // println(s"failed take size = ${buffer.size()}") + // our retry failed, we're registered as a listener, so suspend + Left(Some(F.delay(clear()))) + } } } - } - // suspend until an offerer wakes us or our retry succeeds, then return a result - wait *> F.defer(if (received) F.pure(result) else take) + val notifyAnyway = F delay { + // we might have been awakened and canceled simultaneously + // try waking up another taker just in case + notifyOne() + } + + // suspend until an offerer wakes us or our retry succeeds, then return a result + (poll(wait) *> F.defer(if (received) F.pure(result) else poll(take))) + .onCancel(notifyAnyway) + } } }