Skip to content

Commit

Permalink
Merge branch 'master' into transaction-rebalance-safe-commits-followup
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten authored Jan 13, 2025
2 parents e700fc4 + 77c631a commit 65dec9d
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 9 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ a8871d8d1aaea8d735d126676077829c3b3073a0

# Scala Steward: Reformat with scalafmt 3.7.14
021c238deea31b591ea19cfb5187666acebc5456

# Scala Steward: Reformat with scalafmt 3.8.4
795e9aeb3295061d10be0a8c2e1d4c84d291640f
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=3.8.3
version=3.8.4
project.git = true
maxColumn = 120
align {
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ Kafka has a mature Java client for producing and consuming events, but it has a
In order to use this library, we need to add the following line in our `build.sbt` file:

```scala
libraryDependencies += "dev.zio" %% "zio-kafka" % "2.9.0"
libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.9.0" % Test
libraryDependencies += "dev.zio" %% "zio-kafka" % "2.9.1"
libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.9.1" % Test
```

Snapshots are available on Sonatype's snapshot repository https://oss.sonatype.org/content/repositories/snapshots.
Expand Down
37 changes: 35 additions & 2 deletions docs/preventing-duplicates.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,39 @@ a partition.
For this to work correctly, your program must process a chunk of records within max-rebalance-duration. The clock
starts the moment the chunk is pushed into the stream and ends when the commits for these records complete.

For more information see the scaladocs in `ConsumerSettings`, read the description of
[pull request #1098](https://github.com/zio/zio-kafka/pull/1098) that introduced this feature, or watch the presentation
In addition, your program must commit the offsets of consumed records. The most straightforward way is to commit to the
Kafka brokers. This is done by calling `.commit` on the offset of consumed records (see the consumer documentation).
However, there are more options: external commits and transactional producing.

### Commit to an external system

When you commit to an external system (e.g. by writing to a relational database) the zio-kafka consumer needs to know
about those commits before it can work in rebalance-safe-commits mode. Inform zio-kafka about external commits by
invoking method `Consumer.registerExternalCommits(offsetBatch: OffsetBatch)` (available since zio-kafka 2.10.0).

Here is what this could look like:

```scala
import zio.kafka.consumer._

consumer.plainStream(Subscription.topics("topic2000"), Serde.string, Serde.string)
.mapZIO { record =>
database.store(record.offset) *> // <-- the external commit
consumer.registerExternalCommits(OffsetBatch(record.offset))
}
.runDrain
```

### Commit with a transactional producer

Although transactional producing is possible with zio-kafka, it is not easy and the code is very messy (see
`ConsumerSpec` for an example). Transactional producing can not be used in combination with rebalance-safe-commits mode.

Zio-kafka v3.0.0 will make transactional producing much easier.

## More information

There is more information in the scaladocs of `ConsumerSettings` and the description of
[pull request #1098](https://github.com/zio/zio-kafka/pull/1098) that introduced this feature.
You can also watch the presentation
[Making ZIO-Kafka Safer And Faster](https://www.youtube.com/watch?v=MJoRwEyyVxM). The relevant part starts at 10:24.
Original file line number Diff line number Diff line change
Expand Up @@ -731,12 +731,12 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messagesReceived: Ref[List[(String, String)]],
done: Promise[Nothing, Unit]
) =
consumeWithStrings(client, Some(group), subscription)({ record =>
consumeWithStrings(client, Some(group), subscription)(record =>
for {
messagesSoFar <- messagesReceived.updateAndGet(_ :+ (record.key() -> record.value()))
_ <- ZIO.when(messagesSoFar.size == nrMessages)(done.succeed(()))
} yield ()
}).fork
).fork

for {
topic <- randomTopic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,14 @@ object TransactionalProducer {
}

override def createTransaction: ZIO[Scope, Throwable, Transaction] =
semaphore.withPermitScoped *> {
semaphore.withPermitScoped *>
ZIO.acquireReleaseExit {
for {
offsetBatchRef <- Ref.make(OffsetBatch.empty)
closedRef <- Ref.make(false)
_ <- ZIO.attemptBlocking(live.p.beginTransaction())
} yield new TransactionImpl(producer = live, offsetBatchRef = offsetBatchRef, closed = closedRef)
} { case (transaction: TransactionImpl, exit) => transaction.markAsClosed *> commitOrAbort(transaction, exit) }
}
}

def createTransaction: ZIO[TransactionalProducer & Scope, Throwable, Transaction] =
Expand Down

0 comments on commit 65dec9d

Please sign in to comment.