Skip to content

Commit

Permalink
runWithGracefulShutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Mar 30, 2024
1 parent fe2db5d commit 1436472
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 13 deletions.
49 changes: 49 additions & 0 deletions docs/consuming-kafka-topics-using-zio-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,52 @@ Consumer.partitionedStream(Subscription.topics("topic150"), Serde.string, Serde.
.mapZIO(_.commit)
.runDrain
```

## Controlled shutdown

The examples above will keep processing records forever, or until the fiber is interrupted, typically at application shutdown. When interrupted, in-flight records will not be processed fully through all stream stages and offsets may not be committed. For fast shutdown in an at-least-once processing scenario this is fine. zio-kafka also supports a _graceful shutdown_, where the fetching of records for the subscribed topics/partitions is stopped, the streams are ended and all downstream stages are completed, allowing all in-flight messages to be fully processed.

Use the `*withControl` variants of `plainStream`, `partitionedStream` and `partitionedAssignmentStream` for this purpose. These return a ZIO producing a `SubscriptionStreamControl` and requiring a `Scope`. When provided with a `Scope`, eg using `ZIO.scoped { .. }`, the consumer remains subscribed to the given topics for the lifetime of the `Scope`. The graceful shutdown may be initiated by calling `SubscriptionStreamControl#stop`.

```scala
import zio.Console.printLine
import zio.kafka.consumer._

ZIO.scoped {
for {
streamControl <- Consumer.partitionedStreamWithControl(Subscription.topics("topic150"), Serde.string, Serde.string)
stream = streamControl.stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) =>
ZStream.fromZIO(printLine(s"Starting stream for topic '${topicPartition.topic}' partition ${topicPartition.partition}")) *>
partitionStream
.tap(record => printLine(s"key: ${record.key}, value: ${record.value}")) // Replace with a custom message handling effect
.map(_.offset)
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
streamFiber <- stream.forkScoped
_ <- ZIO.sleep(10.seconds) *> streamControl.stop
_ <- streamFiber.join
} yield ()
}
```

In most use cases you don't need to explicitly control when to shutdown the stream, but you simply want a graceful shutdown on interruption when the application terminates. The `Consumer#runWithGracefulShutdown` method implements this pattern and can be used as follows:

```scala
import zio.Console.printLine
import zio.kafka.consumer._

ZIO.scoped {
Consumer.runWithGracefulShutdown(Consumer.partitionedStreamWithControl(Subscription.topics("topic150"), Serde.string, Serde.string)) { stream =>
stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) =>
partitionStream
.tap(record => printLine(s"key: ${record.key}, value: ${record.value}"))
.map(_.offset)
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assertTrue(offset.map(_.offset).contains(9L))
} @@ TestAspect.nonFlaky(2),
suite("streamWithControl")(
test("must end streams while still processing commits") {
test("stop must end streams while still processing commits") {
for {
topic <- randomTopic
group <- randomGroup
Expand Down Expand Up @@ -386,6 +386,39 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
_ <- keepProducing.set(false)
} yield assertCompletes
},
test("runWithGracefulShutdown must end streams while still processing commits") {
val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
val topic = "test-run-with-graceful-shutdown"
for {
group <- randomGroup
client <- randomClient
_ <- produceMany(topic, kvs)
messagesReceived <- Ref.make[Int](0)
offset <- ZIO.scoped {
for {
stop <- Promise.make[Nothing, Unit]
fib <-
Consumer
.runWithGracefulShutdown(
Consumer.plainStreamWithControl(Subscription.topics(topic), Serde.string, Serde.string)
) { stream =>
stream.mapConcatZIO { record =>
for {
nr <- messagesReceived.updateAndGet(_ + 1)
_ <- stop.succeed(()).when(nr == 10)
} yield if (nr < 10) Seq(record.offset) else Seq.empty
}
.transduce(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
}
.forkScoped
_ <- stop.await *> fib.interrupt
offset <- Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head)
} yield offset
}.provideSomeLayer[Kafka](consumer(client, Some(group)))
} yield assert(offset.map(_.offset))(isSome(equalTo(9L)))
},
test("can handle stopping twice") {
for {
topic <- randomTopic
Expand Down Expand Up @@ -526,7 +559,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.tap(_ => stream1Started.succeed(()))
.zipWithIndex
.map(_._2)
.debug
.takeWhile(_ < 2 * kvs.size - 1)
.runDrain
.forkScoped
Expand Down
31 changes: 26 additions & 5 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ trait Consumer {
}

/**
* Like [[partitionedAssignmentStream]] but returns a [[SubscriptionStreamControl]] used to do a controlled shutdown
* of the stream
* Like [[partitionedAssignmentStream]] but returns a [[SubscriptionStreamControl]] used to do a graceful shutdown of
* the stream
*
* The returned scope determines the lifetime of the subscription of the kafka consumer, which is necessary to be able
* to commit offsets.
Expand Down Expand Up @@ -117,8 +117,7 @@ trait Consumer {
}

/**
* Like [[partitionedStream]] but returns a [[SubscriptionStreamControl]] used to do a controlled shutdown of the
* stream
* Like [[partitionedStream]] but returns a [[SubscriptionStreamControl]] used to do a graceful shutdown of the stream
*
* The returned scope determines the lifetime of the subscription of the kafka consumer, which is necessary to be able
* to commit offsets.
Expand Down Expand Up @@ -163,7 +162,7 @@ trait Consumer {
}

/**
* Like [[plainStream]] but returns a [[SubscriptionStreamControl]] used to do a controlled shutdown of the stream
* Like [[plainStream]] but returns a [[SubscriptionStreamControl]] used to do a graceful shutdown of the stream
*
* The returned scope determines the lifetime of the subscription of the kafka consumer, which is necessary to be able
* to commit offsets.
Expand Down Expand Up @@ -390,6 +389,28 @@ object Consumer {
_.plainStreamWithControl(subscription, keyDeserializer, valueDeserializer, bufferSize)
)

/**
* Takes a SubscriptionStreamControl for some stream and runs the given ZIO workflow on that stream such that, when
* interrupted, stops fetching records and gracefully waits for the ZIO workflow to complete.
*
* @param f
* Takes the stream as input and returns a ZIO workflow that processes the stream. The workflow cannot have a
* meaningful result value (`Any` type), because it is expected to run forever until external interruption. `f` is
* typically something like `stream => stream.mapZIO(record => ZIO.debug(record)).mapZIO(_.offset.commit)`
*/
def runWithGracefulShutdown[StreamType, R, E](
streamControl: ZIO[Scope with Consumer, E, SubscriptionStreamControl[StreamType]]
)(
f: StreamType => ZIO[R, E, Any]
): ZIO[R with Consumer, E, Any] =
ZIO.scoped[R with Consumer] {
for {
control <- streamControl
fib <- f(control.stream).forkScoped
result <- fib.join.onInterrupt(control.stop *> fib.join.ignore)
} yield result
}

/**
* Accessor method
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import zio.UIO
* to be processed and their offsets committed.
*
* As long as this object is in scope, the Kafka consumer remains subscribed.
*
* @tparam StreamType
* Type of the stream returned from [[stream]]
*/
Expand All @@ -21,8 +22,8 @@ trait SubscriptionStreamControl[StreamType] {
* proceed (consumer remains subscribed)
*/
def stop: UIO[Unit]

}

object SubscriptionStreamControl {
def apply[T](stream0: T, stop0: UIO[Unit]): SubscriptionStreamControl[T] = new SubscriptionStreamControl[T] {
override def stream: T = stream0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,9 @@ private[consumer] final class RunloopAccess private (
}
} yield SubscriptionStreamControl(
ZStream.fromQueue(partitionAssignmentQueue),
withRunloopZIO(requireRunning = true)(
_.stopSubscribedTopicPartitions(subscription) *> partitionAssignmentQueue
.offer(Take.end)
.ignore
)
withRunloopZIO(requireRunning = true)(_.stopSubscribedTopicPartitions(subscription)) *> partitionAssignmentQueue
.offer(Take.end)
.ignore
)

}
Expand Down

0 comments on commit 1436472

Please sign in to comment.