From d7ae70997ef552cf6bb0c8e1ad4d7ea1f791cb50 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 15 Jul 2023 11:34:17 +0200 Subject: [PATCH] Prevent asInstanceOf (#963) --- .../consumer/internal/RunloopAccess.scala | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index e213b3001..6e62bf14f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -29,21 +29,22 @@ private[consumer] final class RunloopAccess private ( makeRunloop: UIO[Runloop], diagnostics: Diagnostics ) { - private def runloop(shouldStartIfNot: Boolean): UIO[RunloopState] = + + private def withRunloopZIO[E]( + requireRunning: Boolean + )(whenRunning: Runloop => IO[E, Unit]): IO[E, Unit] = runloopStateRef.updateSomeAndGetZIO { - case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply) - } - private def withRunloopZIO[E, A](shouldStartIfNot: Boolean)(f: Runloop => IO[E, A]): IO[E, A] = - runloop(shouldStartIfNot).flatMap { - case RunloopState.Finalized => ZIO.unit.asInstanceOf[IO[E, A]] - case RunloopState.NotStarted => ZIO.unit.asInstanceOf[IO[E, A]] - case RunloopState.Started(runloop) => f(runloop) + case RunloopState.NotStarted if requireRunning => makeRunloop.map(RunloopState.Started.apply) + }.flatMap { + case RunloopState.NotStarted => ZIO.unit + case RunloopState.Started(runloop) => whenRunning(runloop) + case RunloopState.Finalized => ZIO.unit } /** * No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped. */ - def stopConsumption: UIO[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.stopConsumption) + def stopConsumption: UIO[Unit] = withRunloopZIO(requireRunning = false)(_.stopConsumption) /** * We're doing all of these things in this method so that the interface of this class is as simple as possible and @@ -57,9 +58,9 @@ private[consumer] final class RunloopAccess private ( for { stream <- ZStream.fromHubScoped(partitionHub) // starts the Runloop if not already started - _ <- withRunloopZIO(shouldStartIfNot = true)(_.addSubscription(subscription)) + _ <- withRunloopZIO(requireRunning = true)(_.addSubscription(subscription)) _ <- ZIO.addFinalizer { - withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)) <* + withRunloopZIO(requireRunning = false)(_.removeSubscription(subscription)) <* diagnostics.emit(Finalization.SubscriptionFinalized) } } yield stream