Skip to content

Commit

Permalink
Merge pull request #3307 from armanbilge/fix/resource-both-exit-case
Browse files Browse the repository at this point in the history
Fix propagation of `ExitCase` in `Resource#{both,combineK}`
  • Loading branch information
djspiewak authored Dec 24, 2022
2 parents 3ab83ce + be51ea6 commit fde936b
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 19 deletions.
61 changes: 42 additions & 19 deletions kernel/shared/src/main/scala/cats/effect/kernel/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ sealed abstract class Resource[F[_], +A] extends Serializable {

private[effect] def fold[B](
onOutput: A => F[B],
onRelease: F[Unit] => F[Unit]
onRelease: (ExitCase => F[Unit], ExitCase) => F[Unit]
)(implicit F: MonadCancel[F, Throwable]): F[B] = {
sealed trait Stack[AA]
case object Nil extends Stack[A]
Expand All @@ -178,7 +178,7 @@ sealed abstract class Resource[F[_], +A] extends Serializable {
}
} {
case ((_, release), outcome) =>
onRelease(release(ExitCase.fromOutcome(outcome)))
onRelease(release, ExitCase.fromOutcome(outcome))
}
case Bind(source, fs) =>
loop(source, Frame(fs, stack))
Expand All @@ -204,7 +204,7 @@ sealed abstract class Resource[F[_], +A] extends Serializable {
* the result of applying [F] to
*/
def use[B](f: A => F[B])(implicit F: MonadCancel[F, Throwable]): F[B] =
fold(f, identity)
fold(f, _.apply(_))

/**
* For a resource that allocates an action (type `F[B]`), allocate that action, run it and
Expand Down Expand Up @@ -251,6 +251,10 @@ sealed abstract class Resource[F[_], +A] extends Serializable {
* _each_ of the two resources, nested finalizers are run in the usual reverse order of
* acquisition.
*
* The same [[Resource.ExitCase]] is propagated to every finalizer. If both resources acquired
* successfully, the [[Resource.ExitCase]] is determined by the outcome of [[use]]. Otherwise,
* it is determined by which resource failed or canceled first during acquisition.
*
* Note that `Resource` also comes with a `cats.Parallel` instance that offers more convenient
* access to the same functionality as `both`, for example via `parMapN`:
*
Expand Down Expand Up @@ -281,19 +285,31 @@ sealed abstract class Resource[F[_], +A] extends Serializable {
def both[B](
that: Resource[F, B]
)(implicit F: Concurrent[F]): Resource[F, (A, B)] = {
type Update = (F[Unit] => F[Unit]) => F[Unit]
type Finalizer = Resource.ExitCase => F[Unit]
type Update = (Finalizer => Finalizer) => F[Unit]

def allocate[C](r: Resource[F, C], storeFinalizer: Update): F[C] =
r.fold(_.pure[F], release => storeFinalizer(F.guarantee(_, release)))

val bothFinalizers = F.ref(F.unit -> F.unit)

Resource.make(bothFinalizers)(_.get.flatMap(_.parTupled).void).evalMap { store =>
val thisStore: Update = f => store.update(_.bimap(f, identity))
val thatStore: Update = f => store.update(_.bimap(identity, f))
r.fold(
_.pure[F],
(release, _) => storeFinalizer(fin => ec => F.unit >> fin(ec).guarantee(release(ec)))
)

val noop: Finalizer = _ => F.unit
val bothFinalizers = F.ref((noop, noop))

Resource
.makeCase(bothFinalizers) { (finalizers, ec) =>
finalizers.get.flatMap {
case (thisFin, thatFin) =>
F.void(F.both(thisFin(ec), thatFin(ec)))
}
}
.evalMap { store =>
val thisStore: Update = f => store.update(_.bimap(f, identity))
val thatStore: Update = f => store.update(_.bimap(identity, f))

(allocate(this, thisStore), allocate(that, thatStore)).parTupled
}
F.both(allocate(this, thisStore), allocate(that, thatStore))
}
}

/**
Expand Down Expand Up @@ -721,12 +737,19 @@ sealed abstract class Resource[F[_], +A] extends Serializable {
implicit F: MonadCancel[F, Throwable],
K: SemigroupK[F],
G: Ref.Make[F]): Resource[F, B] =
Resource.make(Ref[F].of(F.unit))(_.get.flatten).evalMap { finalizers =>
def allocate(r: Resource[F, B]): F[B] =
r.fold(_.pure[F], (release: F[Unit]) => finalizers.update(_.guarantee(release)))

K.combineK(allocate(this), allocate(that))
}
Resource
.makeCase(Ref[F].of((_: Resource.ExitCase) => F.unit))((fin, ec) =>
fin.get.flatMap(_(ec)))
.evalMap { finalizers =>
def allocate(r: Resource[F, B]): F[B] =
r.fold(
_.pure[F],
(release, _) =>
finalizers.update(fin => ec => F.unit >> fin(ec).guarantee(release(ec)))
)

K.combineK(allocate(this), allocate(that))
}

}

Expand Down
142 changes: 142 additions & 0 deletions tests/shared/src/test/scala/cats/effect/ResourceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,84 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline {
leftReleased must beTrue
rightReleased must beTrue
}

"propagate the exit case" in {
import Resource.ExitCase

"use succesfully, test left" >> ticked { implicit ticker =>
var got: ExitCase = null
val r = Resource.onFinalizeCase(ec => IO { got = ec })
r.both(Resource.unit).use(_ => IO.unit) must completeAs(())
got mustEqual ExitCase.Succeeded
}

"use successfully, test right" >> ticked { implicit ticker =>
var got: ExitCase = null
val r = Resource.onFinalizeCase(ec => IO { got = ec })
Resource.unit.both(r).use(_ => IO.unit) must completeAs(())
got mustEqual ExitCase.Succeeded
}

"use errored, test left" >> ticked { implicit ticker =>
var got: ExitCase = null
val ex = new Exception
val r = Resource.onFinalizeCase(ec => IO { got = ec })
r.both(Resource.unit).use(_ => IO.raiseError(ex)) must failAs(ex)
got mustEqual ExitCase.Errored(ex)
}

"use errored, test right" >> ticked { implicit ticker =>
var got: ExitCase = null
val ex = new Exception
val r = Resource.onFinalizeCase(ec => IO { got = ec })
Resource.unit.both(r).use(_ => IO.raiseError(ex)) must failAs(ex)
got mustEqual ExitCase.Errored(ex)
}

"right errored, test left" >> ticked { implicit ticker =>
var got: ExitCase = null
val ex = new Exception
val r = Resource.onFinalizeCase(ec => IO { got = ec })
r.both(Resource.eval(IO.sleep(1.second) *> IO.raiseError(ex))).use_ must failAs(ex)
got mustEqual ExitCase.Errored(ex)
}

"left errored, test right" >> ticked { implicit ticker =>
var got: ExitCase = null
val ex = new Exception
val r = Resource.onFinalizeCase(ec => IO { got = ec })
Resource.eval(IO.sleep(1.second) *> IO.raiseError(ex)).both(r).use_ must failAs(ex)
got mustEqual ExitCase.Errored(ex)
}

"use canceled, test left" >> ticked { implicit ticker =>
var got: ExitCase = null
val r = Resource.onFinalizeCase(ec => IO { got = ec })
r.both(Resource.unit).use(_ => IO.canceled) must selfCancel
got mustEqual ExitCase.Canceled
}

"use canceled, test right" >> ticked { implicit ticker =>
var got: ExitCase = null
val r = Resource.onFinalizeCase(ec => IO { got = ec })
Resource.unit.both(r).use(_ => IO.canceled) must selfCancel
got mustEqual ExitCase.Canceled
}

"right canceled, test left" >> ticked { implicit ticker =>
var got: ExitCase = null
val r = Resource.onFinalizeCase(ec => IO { got = ec })
r.both(Resource.eval(IO.sleep(1.second) *> IO.canceled)).use_ must selfCancel
got mustEqual ExitCase.Canceled
}

"left canceled, test right" >> ticked { implicit ticker =>
var got: ExitCase = null
val r = Resource.onFinalizeCase(ec => IO { got = ec })
Resource.eval(IO.sleep(1.second) *> IO.canceled).both(r).use_ must selfCancel
got mustEqual ExitCase.Canceled
}
}
}

"releases both resources on combineK" in ticked { implicit ticker =>
Expand Down Expand Up @@ -643,6 +721,70 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline {
lhs eqv rhs
}
}

"propagate the exit case" in {
import Resource.ExitCase

"use succesfully, test left" >> ticked { implicit ticker =>
var got: ExitCase = null
val r = Resource.onFinalizeCase(ec => IO { got = ec })
r.combineK(Resource.unit).use(_ => IO.unit) must completeAs(())
got mustEqual ExitCase.Succeeded
}

"use errored, test left" >> ticked { implicit ticker =>
var got: ExitCase = null
val ex = new Exception
val r = Resource.onFinalizeCase(ec => IO { got = ec })
r.combineK(Resource.unit).use(_ => IO.raiseError(ex)) must failAs(ex)
got mustEqual ExitCase.Errored(ex)
}

"left errored, test left" >> ticked { implicit ticker =>
var got: ExitCase = null
val ex = new Exception
val r = Resource.onFinalizeCase(ec => IO { got = ec }) *>
Resource.eval(IO.raiseError(ex))
r.combineK(Resource.unit).use_ must completeAs(())
got mustEqual ExitCase.Succeeded
}

"left errored, test right" >> ticked { implicit ticker =>
var got: ExitCase = null
val ex = new Exception
val r = Resource.onFinalizeCase(ec => IO { got = ec })
Resource.eval(IO.raiseError(ex)).combineK(r).use_ must completeAs(())
got mustEqual ExitCase.Succeeded
}

"left errored, use errored, test right" >> ticked { implicit ticker =>
var got: ExitCase = null
val ex = new Exception
val r = Resource.onFinalizeCase(ec => IO { got = ec })
Resource
.eval(IO.raiseError(new Exception))
.combineK(r)
.use(_ => IO.raiseError(ex)) must failAs(ex)
got mustEqual ExitCase.Errored(ex)
}

"use canceled, test left" >> ticked { implicit ticker =>
var got: ExitCase = null
val r = Resource.onFinalizeCase(ec => IO { got = ec })
r.combineK(Resource.unit).use(_ => IO.canceled) must selfCancel
got mustEqual ExitCase.Canceled
}

"left errored, use canceled, test right" >> ticked { implicit ticker =>
var got: ExitCase = null
val r = Resource.onFinalizeCase(ec => IO { got = ec })
Resource
.eval(IO.raiseError(new Exception))
.combineK(r)
.use(_ => IO.canceled) must selfCancel
got mustEqual ExitCase.Canceled
}
}
}

"surround" should {
Expand Down

0 comments on commit fde936b

Please sign in to comment.