Skip to content

Commit

Permalink
Prevent asInstanceOf (#963)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten authored Jul 15, 2023
1 parent c53414d commit d7ae709
Showing 1 changed file with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,22 @@ private[consumer] final class RunloopAccess private (
makeRunloop: UIO[Runloop],
diagnostics: Diagnostics
) {
private def runloop(shouldStartIfNot: Boolean): UIO[RunloopState] =

private def withRunloopZIO[E](
requireRunning: Boolean
)(whenRunning: Runloop => IO[E, Unit]): IO[E, Unit] =
runloopStateRef.updateSomeAndGetZIO {
case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply)
}
private def withRunloopZIO[E, A](shouldStartIfNot: Boolean)(f: Runloop => IO[E, A]): IO[E, A] =
runloop(shouldStartIfNot).flatMap {
case RunloopState.Finalized => ZIO.unit.asInstanceOf[IO[E, A]]
case RunloopState.NotStarted => ZIO.unit.asInstanceOf[IO[E, A]]
case RunloopState.Started(runloop) => f(runloop)
case RunloopState.NotStarted if requireRunning => makeRunloop.map(RunloopState.Started.apply)
}.flatMap {
case RunloopState.NotStarted => ZIO.unit
case RunloopState.Started(runloop) => whenRunning(runloop)
case RunloopState.Finalized => ZIO.unit
}

/**
* No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped.
*/
def stopConsumption: UIO[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.stopConsumption)
def stopConsumption: UIO[Unit] = withRunloopZIO(requireRunning = false)(_.stopConsumption)

/**
* We're doing all of these things in this method so that the interface of this class is as simple as possible and
Expand All @@ -57,9 +58,9 @@ private[consumer] final class RunloopAccess private (
for {
stream <- ZStream.fromHubScoped(partitionHub)
// starts the Runloop if not already started
_ <- withRunloopZIO(shouldStartIfNot = true)(_.addSubscription(subscription))
_ <- withRunloopZIO(requireRunning = true)(_.addSubscription(subscription))
_ <- ZIO.addFinalizer {
withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)) <*
withRunloopZIO(requireRunning = false)(_.removeSubscription(subscription)) <*
diagnostics.emit(Finalization.SubscriptionFinalized)
}
} yield stream
Expand Down

0 comments on commit d7ae709

Please sign in to comment.