Skip to content

Commit

Permalink
Fix the flakiness of DamperTest (#555)
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar authored Jan 25, 2024
1 parent 23427ec commit 1b18746
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.concurrent.duration.FiniteDuration
*
* The trivial implementation would be the following:
* {{{
* class TrivialDumper[F[_]: Temporal](duration: FiniteDuration) extends Damper[F] {
* class TrivialDamper[F[_]: Temporal](duration: FiniteDuration) extends Damper[F] {
* def acquire: F[Unit] = Temporal[F].sleep(duration)
* def release: F[Unit] = ().pure[F]
* }
Expand Down Expand Up @@ -92,7 +92,29 @@ object Damper {
type WakeUp = Deferred[F, Option[Entry]]

object State {

/** There are no [[#acquire]] calls sleeping.
*
* @param acquired
* Number of [[#acquire]] calls without corresponding [[#release]]
* calls.
*/
final case class Idle(acquired: Acquired) extends State

/** There are some [[#acquire]] calls sleeping.
*
* @param acquired
* Number of [[#acquire]] calls without corresponding [[#release]]
* calls.
* @param entries
* List of the handles allowing specific [[#acquire]] call to happen.
* As soon as respective [[Entry]] is called, the fiber will stop
* waiting and `acquired` will increment.
* @param wakeUp
* The deferred marks currently ongoing delay. It will be completed by
* a value from `entries` when the sleep is over, or `None` if the
* sleep is interuptted by call of [[#release]].
*/
final case class Busy(acquired: Acquired, entries: Queue[Entry], wakeUp: WakeUp) extends State
}

Expand All @@ -111,16 +133,30 @@ object Damper {
)
}

/** Schedule a next sleep for queued entries.
*
* @param acquired
* Number of [[Damper#acquired]] calls without [[Damper#release]]
* calls.
* @param entries
* Wake up callacks for delayed [[Damper#acquired]] calls.
* @param effect
* Wake up callback for a first call to be allowed. It could be
* several `Entry` elements glued together, if the first one has a
* delay scheduled, and others have zero delays.
*/
@tailrec def idleOrBusy(acquired: Acquired, entries: Queue[Entry], effect: F[Unit]): Result = {
entries.dequeueOption match {
case Some((entry, entries)) =>
val delay = delayOf1(acquired)
if (delay.length == 0) {
if (delay.length == 0) {
// zero delay is expected, we can just glue the wake up calls together
idleOrBusy(
acquired + 1,
entries,
effect.productR { entry })
} else {
// non-zero delay is expected, let's schedule a sleep
val wakeUp = Deferred.unsafe[F, Option[Entry]]
(
State.Busy(acquired, entries, wakeUp),
Expand All @@ -129,6 +165,7 @@ object Damper {
}

case None =>
// no delays are expected, let's just execute accumulated wake up effect
idle(acquired, effect)
}
}
Expand All @@ -145,6 +182,7 @@ object Damper {
)
}

/** Update the state after [[Damper#acquire]] was allowed to happen */
def acquire = {
ref.modify {
case state: State.Idle => idle(state.acquired + 1, entry)
Expand All @@ -159,8 +197,13 @@ object Damper {
.race { Temporal[F].sleep(delay) }
result <- result match {
case Left(Some(`entry`)) =>
// the sleep did not finish and was interrupted
// by doing `wakeUp.complete` on this entry
acquire
case Left(_) =>
// the sleep did not finish and was interrupted
// by doing `wakeUp.complete` without this entry,
// i.e. `None`, for example, when doing cancelation
Clock[F]
.realTime
.flatMap { end =>
Expand All @@ -186,6 +229,7 @@ object Damper {
}

case Right(()) =>
// the sleep is finished and was not interrupted
acquire
}
result <- result
Expand Down Expand Up @@ -240,18 +284,26 @@ object Damper {
val acquired = state.acquired
val delay = delayOf1(acquired)
if (delay.length == 0) {
// zero delay is expected
// just increment number of acquired resources
// and return immediately
(
state.copy(acquired = acquired + 1),
().pure[F].pure[F]
)
} else {
// this is a first delay, so there is no queue yet,
// but we remember it to be first to wake up
// and also mark the state as busy, now
val wakeUp = Deferred.unsafe[F, Option[Entry]]
(
State.Busy(acquired, Queue.empty, wakeUp),
Defer[F].defer { start(entry, delay, wakeUp).as { await(filter = false) } }
)
}
case state: State.Busy =>
// we already have some delays in progress,
// so we add a new one to the waiting queue
(
state.copy(entries = state.entries.enqueue(entry)),
Defer[F].defer { await(filter = true).pure[F] }
Expand All @@ -267,11 +319,16 @@ object Damper {
ref
.modify {
case State.Idle(acquired) =>
// there are no delays in progress,
// so we just decrement number of acquired resources
(
State.Idle(acquired - 1),
().pure[F]
)
case state: State.Busy =>
// there are delays in progress,
// so we wake up the first entry now
// without waiting for delay to finish
(
state.copy(acquired = state.acquired - 1),
state.wakeUp.complete1(none)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
package com.evolutiongaming.kafka.journal

import cats.effect.{IO, Ref}
import cats.effect.{FiberIO, IO, Ref}
import cats.effect.unsafe.implicits.global
import cats.syntax.all._
import org.scalatest.compatible.Assertion
import org.scalatest.funsuite.AsyncFunSuite
import org.scalatest.matchers.should.Matchers
import com.evolutiongaming.kafka.journal.IOSuite._

import scala.concurrent.TimeoutException
import scala.concurrent.duration._


class DamperTest extends AsyncFunSuite with Matchers {

test(".release unblocks .acquire") {
val result = for {
damper <- Damper.of[IO](a => (a % 2) * 500.millis)
_ <- damper.acquire
fiber <- damper.acquire.start
result <- fiber.joinWithNever.timeout(10.millis).attempt
_ <- IO { result should matchPattern { case Left(_: TimeoutException) => () } }
_ <- assertSleeping(fiber)
_ <- damper.release
_ <- fiber.joinWithNever
} yield {}
Expand All @@ -30,11 +31,9 @@ class DamperTest extends AsyncFunSuite with Matchers {
damper <- Damper.of[IO](a => (a % 2) * 500.millis)
_ <- damper.acquire
fiber0 <- damper.acquire.start
result <- fiber0.joinWithNever.timeout(10.millis).attempt
_ <- IO { result should matchPattern { case Left(_: TimeoutException) => () } }
_ <- assertSleeping(fiber0)
fiber1 <- damper.acquire.start
result <- fiber1.joinWithNever.timeout(10.millis).attempt
_ <- IO { result should matchPattern { case Left(_: TimeoutException) => () } }
_ <- assertSleeping(fiber1)
_ <- damper.release
_ <- fiber0.joinWithNever
_ <- fiber1.joinWithNever
Expand All @@ -44,30 +43,40 @@ class DamperTest extends AsyncFunSuite with Matchers {

test("cancel") {
val result = for {
// start -> sleep(1.minute) -> fiber0 -> sleep(0.minutes) -> fiber1
ref <- Ref[IO].of(List(1.minute, 0.minutes))
damper <- Damper.of[IO] { _ =>
ref
.modify {
// always return 1.minute
// after two delays above from initial Ref are consumed
case Nil => (Nil, 1.minute)
case a :: as => (as, a)
}
.unsafeRunSync()
}

fiber0 <- damper.acquire.start
result <- fiber0.joinWithNever.timeout(10.millis).attempt
_ <- IO { result should matchPattern { case Left(_: TimeoutException) => () } }
_ <- assertSleeping(fiber0)

fiber1 <- damper.acquire.start
result <- fiber1.joinWithNever.timeout(10.millis).attempt
_ <- IO { result should matchPattern { case Left(_: TimeoutException) => () } }
_ <- assertSleeping(fiber1)

fiber2 <- damper.acquire.start
result <- fiber2.joinWithNever.timeout(10.millis).attempt
_ <- IO { result should matchPattern { case Left(_: TimeoutException) => () } }
_ <- assertSleeping(fiber2)

_ <- fiber1.cancel
_ <- fiber0.cancel

delays <- ref.get
_ <- IO {
assert(
delays.nonEmpty,
"delayOf was called more than once, the test result will not be meaningful," +
"consider increasing sleeping delay in `assertSleeping` to ensure the orderly" +
"start of fiber0, fiber1 and fiber2")
}

_ <- fiber2.joinWithNever
_ <- damper.release
} yield {}
Expand Down Expand Up @@ -107,4 +116,10 @@ class DamperTest extends AsyncFunSuite with Matchers {
} yield {}
result.run()
}

def assertSleeping(fiber: FiberIO[Unit]): IO[Assertion] =
fiber.joinWithNever.timeout(100.millis).attempt flatMap { result =>
IO { result should matchPattern { case Left(_: TimeoutException) => () } }
}

}

0 comments on commit 1b18746

Please sign in to comment.