Skip to content

Commit

Permalink
Remove the retry mechanism in the RunloopAccess::stopConsumption me…
Browse files Browse the repository at this point in the history
…thod as asked by @erikvanoosten
  • Loading branch information
guizmaii committed Jun 24, 2023
1 parent 38a4ac1 commit 7558941
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import java.util.concurrent.atomic.AtomicInteger
import scala.reflect.ClassTag

//noinspection SimplifyAssertInspection
Expand Down Expand Up @@ -1140,50 +1139,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
)
)
},
test(
"Calling `Consumer::stopConsumption` just after starting a forked consumption session should stop the consumption"
) {
val numberOfMessages: Int = 100000
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceMany(topic, kvs)
ref = new AtomicInteger(0)
// Starting a consumption session to start the Runloop.
fiber <-
consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.mapChunksZIO(chunks => ZIO.logDebug(s"Consumed ${ref.getAndAdd(chunks.size)} messages").as(chunks))
.take(numberOfMessages.toLong)
.runCount
.fork
_ <- consumer.stopConsumption
consumed_0 <- fiber.join
} yield assert(consumed_0)(isLessThan(numberOfMessages.toLong))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
} @@ nonFlaky(5),
test(
"it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously"
) {
Expand Down
2 changes: 1 addition & 1 deletion zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ object Consumer {
*/
override def stopConsumption: UIO[Unit] =
ZIO.logDebug("stopConsumption called") *>
runloopAccess.stopConsumption()
runloopAccess.stopConsumption

override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] =
consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription }
import zio.stream.{ Stream, Take, UStream, ZStream }
import zio.{ durationInt, Hub, IO, Ref, Scope, UIO, ZIO, ZLayer }
import zio.{ Hub, IO, Ref, Scope, UIO, ZIO, ZLayer }

private[internal] sealed trait RunloopState
private[internal] object RunloopState {
Expand Down Expand Up @@ -42,26 +42,8 @@ private[consumer] final class RunloopAccess private (

/**
* No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped.
*
* Note:
* 1. We do a 100 retries waiting 10ms between each to roughly take max 1s before to stop to retry. We want to avoid
* an infinite loop. We need this recursion because if the user calls `stopConsumption` before the Runloop is
* started, we need to wait for it to be started. Can happen if the user starts a consuming session in a forked
* fiber and immediately after forking, stops it. The Runloop will potentially not be started yet.
*/
// noinspection SimplifyUnlessInspection
def stopConsumption(retry: Int = 100, initialCall: Boolean = true): UIO[Unit] = {
@inline def next: UIO[Unit] = stopConsumption(retry - 1, initialCall = false)

runloop(shouldStartIfNot = false).flatMap {
case RunloopState.Stopped => ZIO.unit
case RunloopState.Started(runloop) => runloop.stopConsumption
case RunloopState.NotStarted =>
if (retry <= 0) ZIO.unit
else if (initialCall) next
else next.delay(10.millis)
}
}
def stopConsumption: UIO[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.stopConsumption)

/**
* We're doing all of these things in this method so that the interface of this class is as simple as possible and
Expand Down

0 comments on commit 7558941

Please sign in to comment.