Skip to content

Commit

Permalink
Making the Runloop should be an UIO
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Jun 7, 2023
1 parent c6867ff commit c445263
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ private[consumer] object Runloop {
restartStreamsOnRebalancing: Boolean,
partitionsHub: Hub[Take[Throwable, PartitionAssignment]],
consumerSettings: ConsumerSettings
): ZIO[Scope, Throwable, Runloop] =
): URIO[Scope, Runloop] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized))
commandQueue <- ZIO.acquireRelease(Queue.bounded[RunloopCommand](commandQueueSize))(_.shutdown)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ private[internal] object RunloopState {
private[consumer] final class RunloopAccess private (
runloopStateRef: Ref.Synchronized[RunloopState],
partitionHub: Hub[Take[Throwable, PartitionAssignment]],
makeRunloop: Task[Runloop],
makeRunloop: UIO[Runloop],
diagnostics: Diagnostics
) {
private def runloop(shouldStartIfNot: Boolean): UIO[RunloopState] =
runloopStateRef.updateSomeAndGetZIO {
case RunloopState.NotStarted if shouldStartIfNot =>
for {
promise <- Promise.make[Throwable, Runloop]
_ <- makeRunloop.foldZIO(promise.fail, promise.succeed).fork
_ <- makeRunloop.map(promise.succeed).fork
} yield RunloopState.Started(promise)
}
private def withRunloopZIO[R, A](shouldStartIfNot: Boolean)(f: Runloop => RIO[R, A]): RIO[R, A] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package zio.kafka.consumer.internal

import zio.{ Executor, Scope, ZIO }
import zio.{ Executor, Scope, URIO, ZIO }

import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicLong
Expand All @@ -9,16 +9,16 @@ private[consumer] object RunloopExecutor {

private val counter: AtomicLong = new AtomicLong(0)

private def newSingleThreadedExecutor(i: Long): ZIO[Scope, Throwable, Executor] =
private def newSingleThreadedExecutor(i: Long): URIO[Scope, Executor] =
ZIO.acquireRelease {
ZIO.attempt {
ZIO.succeed {
val javaExecutor =
Executors.newSingleThreadExecutor(runnable => new Thread(runnable, s"zio-kafka-runloop-thread-$i"))

Executor.fromJavaExecutor(javaExecutor) -> javaExecutor
}
} { case (_, executor) => ZIO.attempt(executor.shutdown()).orDie }.map(_._1)

val newInstance: ZIO[Scope, Throwable, Executor] = newSingleThreadedExecutor(counter.getAndIncrement())
val newInstance: URIO[Scope, Executor] = newSingleThreadedExecutor(counter.getAndIncrement())

}

0 comments on commit c445263

Please sign in to comment.