From 35d8412ef263af61b638271446b3989fd1eb93fe Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 14 Apr 2024 18:46:11 +0200 Subject: [PATCH] Update doc --- ...onsuming-kafka-topics-using-zio-streams.md | 42 +++---------------- 1 file changed, 6 insertions(+), 36 deletions(-) diff --git a/docs/consuming-kafka-topics-using-zio-streams.md b/docs/consuming-kafka-topics-using-zio-streams.md index 97ebe508c4..3781565c8b 100644 --- a/docs/consuming-kafka-topics-using-zio-streams.md +++ b/docs/consuming-kafka-topics-using-zio-streams.md @@ -69,50 +69,20 @@ The examples above will keep processing records forever, or until the fiber is i zio-kafka also supports a _graceful shutdown_, where the fetching of records for the subscribed topics/partitions is stopped, the streams are ended and all downstream stages are completed, allowing in-flight records to be fully processed. -Use the `*withControl` variants of `plainStream`, `partitionedStream` and `partitionedAssignmentStream` for this purpose. These return a ZIO producing a `SubscriptionStreamControl` and requiring a `Scope`. When provided with a `Scope`, e.g. using `ZIO.scoped { .. }`, the consumer remains subscribed to the given topics for the lifetime of the `Scope`. Commits are possible for as long as the consumer is subscribed. The graceful shutdown as described above may be initiated by calling `SubscriptionStreamControl#stop`. +Use the `*withGracefulShutdown` variants of `plainStream`, `partitionedStream` and `partitionedAssignmentStream` for this purpose. These methods accept a parameter that describes the execution of a stream, which is gracefully ended when the method is interrupted. ```scala import zio.Console.printLine import zio.kafka.consumer._ -ZIO.scoped { - for { - streamControl <- Consumer.partitionedStreamWithControl( - Subscription.topics("topic150"), - Serde.string, - Serde.string) - stream = streamControl.stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) => - ZStream.fromZIO( - printLine(s"Starting stream for topic '${topicPartition.topic}' partition ${topicPartition.partition}") - ) *> - partitionStream - .tap(record => printLine(s"key: ${record.key}, value: ${record.value}")) // Replace with a custom message handling effect - .map(_.offset) +Consumer.partitionedStreamWithGracefulShutdown(Subscription.topics("topic150"), Serde.string, Serde.string) { stream => + stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) => + partitionStream + .tap(record => printLine(s"key: ${record.key}, value: ${record.value}")) + .map(_.offset) } .aggregateAsync(Consumer.offsetBatches) .mapZIO(_.commit) .runDrain - streamFiber <- stream.forkScoped - _ <- ZIO.sleep(10.seconds) *> streamControl.stop - _ <- streamFiber.join - } yield () -} -``` - -In most use cases you don't need to explicitly control when to shutdown the stream, but you simply want a graceful shutdown on interruption when the application terminates. The `Consumer#runWithGracefulShutdown` method implements this pattern and can be used as follows: - -```scala -import zio.Console.printLine -import zio.kafka.consumer._ - -Consumer.runWithGracefulShutdown(Consumer.partitionedStreamWithControl(Subscription.topics("topic150"), Serde.string, Serde.string)) { stream => -stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) => - partitionStream - .tap(record => printLine(s"key: ${record.key}, value: ${record.value}")) - .map(_.offset) - } - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(_.commit) - .runDrain } ``` \ No newline at end of file