From d18fa76ddd8137ca4d54073d4e12c4c15ecdf43f Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 20 Dec 2022 04:03:55 +0000 Subject: [PATCH] Add some uncancelables --- .../cats/effect/unsafe/EpollSystem.scala | 96 ++++++++++--------- 1 file changed, 50 insertions(+), 46 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index 314c51f492..d7243a441c 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -109,58 +109,62 @@ object EpollSystem extends PollingSystem { def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = readSemaphore.permit.surround { - def go(a: A, before: Int): IO[B] = - f(a).flatMap { - case Left(a) => - IO(readReadyCounter).flatMap { after => - if (before != after) - // there was a read-ready notification since we started, try again immediately - go(a, after) - else - IO.async[Int] { cb => - IO { - readCallback = cb - // check again before we suspend - val now = readReadyCounter - if (now != before) { - cb(Right(now)) - readCallback = null - None - } else Some(IO(this.readCallback = null)) - } - }.flatMap(go(a, _)) - } - case Right(b) => IO.pure(b) - } + IO.uncancelable { poll => + def go(a: A, before: Int): IO[B] = + poll(f(a)).flatMap { + case Left(a) => + IO(readReadyCounter).flatMap { after => + if (before != after) + // there was a read-ready notification since we started, try again immediately + go(a, after) + else + poll(IO.async[Int] { cb => + IO { + readCallback = cb + // check again before we suspend + val now = readReadyCounter + if (now != before) { + cb(Right(now)) + readCallback = null + None + } else Some(IO(this.readCallback = null)) + } + }).flatMap(go(a, _)) + } + case Right(b) => IO.pure(b) + } + } IO(readReadyCounter).flatMap(go(a, _)) } def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = writeSemaphore.permit.surround { - def go(a: A, before: Int): IO[B] = - f(a).flatMap { - case Left(a) => - IO(writeReadyCounter).flatMap { after => - if (before != after) - // there was a write-ready notification since we started, try again immediately - go(a, after) - else - IO.async[Int] { cb => - IO { - writeCallback = cb - // check again before we suspend - val now = writeReadyCounter - if (now != before) { - cb(Right(now)) - writeCallback = null - None - } else Some(IO(this.writeCallback = null)) - } - }.flatMap(go(a, _)) - } - case Right(b) => IO.pure(b) - } + IO.uncancelable { poll => + def go(a: A, before: Int): IO[B] = + poll(f(a)).flatMap { + case Left(a) => + IO(writeReadyCounter).flatMap { after => + if (before != after) + // there was a write-ready notification since we started, try again immediately + go(a, after) + else + poll(IO.async[Int] { cb => + IO { + writeCallback = cb + // check again before we suspend + val now = writeReadyCounter + if (now != before) { + cb(Right(now)) + writeCallback = null + None + } else Some(IO(this.writeCallback = null)) + } + }).flatMap(go(a, _)) + } + case Right(b) => IO.pure(b) + } + } IO(writeReadyCounter).flatMap(go(a, _)) }