Skip to content

Commit

Permalink
Update doc
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Apr 14, 2024
1 parent b599446 commit 35d8412
Showing 1 changed file with 6 additions and 36 deletions.
42 changes: 6 additions & 36 deletions docs/consuming-kafka-topics-using-zio-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,50 +69,20 @@ The examples above will keep processing records forever, or until the fiber is i

zio-kafka also supports a _graceful shutdown_, where the fetching of records for the subscribed topics/partitions is stopped, the streams are ended and all downstream stages are completed, allowing in-flight records to be fully processed.

Use the `*withControl` variants of `plainStream`, `partitionedStream` and `partitionedAssignmentStream` for this purpose. These return a ZIO producing a `SubscriptionStreamControl` and requiring a `Scope`. When provided with a `Scope`, e.g. using `ZIO.scoped { .. }`, the consumer remains subscribed to the given topics for the lifetime of the `Scope`. Commits are possible for as long as the consumer is subscribed. The graceful shutdown as described above may be initiated by calling `SubscriptionStreamControl#stop`.
Use the `*withGracefulShutdown` variants of `plainStream`, `partitionedStream` and `partitionedAssignmentStream` for this purpose. These methods accept a parameter that describes the execution of a stream, which is gracefully ended when the method is interrupted.

```scala
import zio.Console.printLine
import zio.kafka.consumer._

ZIO.scoped {
for {
streamControl <- Consumer.partitionedStreamWithControl(
Subscription.topics("topic150"),
Serde.string,
Serde.string)
stream = streamControl.stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) =>
ZStream.fromZIO(
printLine(s"Starting stream for topic '${topicPartition.topic}' partition ${topicPartition.partition}")
) *>
partitionStream
.tap(record => printLine(s"key: ${record.key}, value: ${record.value}")) // Replace with a custom message handling effect
.map(_.offset)
Consumer.partitionedStreamWithGracefulShutdown(Subscription.topics("topic150"), Serde.string, Serde.string) { stream =>
stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) =>
partitionStream
.tap(record => printLine(s"key: ${record.key}, value: ${record.value}"))
.map(_.offset)
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
streamFiber <- stream.forkScoped
_ <- ZIO.sleep(10.seconds) *> streamControl.stop
_ <- streamFiber.join
} yield ()
}
```

In most use cases you don't need to explicitly control when to shutdown the stream, but you simply want a graceful shutdown on interruption when the application terminates. The `Consumer#runWithGracefulShutdown` method implements this pattern and can be used as follows:

```scala
import zio.Console.printLine
import zio.kafka.consumer._

Consumer.runWithGracefulShutdown(Consumer.partitionedStreamWithControl(Subscription.topics("topic150"), Serde.string, Serde.string)) { stream =>
stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) =>
partitionStream
.tap(record => printLine(s"key: ${record.key}, value: ${record.value}"))
.map(_.offset)
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
}
```

0 comments on commit 35d8412

Please sign in to comment.