Skip to content

Commit

Permalink
Remove RebalanceConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Apr 2, 2024
1 parent f4e334f commit 90fa193
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 36 deletions.

This file was deleted.

21 changes: 10 additions & 11 deletions zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ import scala.jdk.CollectionConverters._
* ZIO wrapper around Kafka's `ConsumerRebalanceListener` to work with Scala collection types and ZIO effects.
*/
final case class RebalanceListener(
onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit],
onRevoked: (Set[TopicPartition], RebalanceConsumer) => Task[Unit],
onLost: (Set[TopicPartition], RebalanceConsumer) => Task[Unit]
onAssigned: Set[TopicPartition] => Task[Unit],
onRevoked: Set[TopicPartition] => Task[Unit],
onLost: Set[TopicPartition] => Task[Unit]
) {

/**
* Combine with another [[RebalanceListener]] and execute their actions sequentially
*/
def ++(that: RebalanceListener): RebalanceListener =
RebalanceListener(
(assigned, consumer) => onAssigned(assigned, consumer) *> that.onAssigned(assigned, consumer),
(revoked, consumer) => onRevoked(revoked, consumer) *> that.onRevoked(revoked, consumer),
(lost, consumer) => onLost(lost, consumer) *> that.onLost(lost, consumer)
assigned => onAssigned(assigned) *> that.onAssigned(assigned),
revoked => onRevoked(revoked) *> that.onRevoked(revoked),
lost => onLost(lost) *> that.onLost(lost)
)

def runOnExecutor(executor: Executor): RebalanceListener = RebalanceListener(
Expand All @@ -32,15 +32,14 @@ final case class RebalanceListener(
)

def toKafka(
runtime: Runtime[Any],
consumer: RebalanceConsumer
runtime: Runtime[Any]
): ConsumerRebalanceListener =
new ConsumerRebalanceListener {
override def onPartitionsRevoked(
partitions: java.util.Collection[TopicPartition]
): Unit = Unsafe.unsafe { implicit u =>
runtime.unsafe
.run(onRevoked(partitions.asScala.toSet, consumer))
.run(onRevoked(partitions.asScala.toSet))
.getOrThrowFiberFailure()
()
}
Expand All @@ -49,7 +48,7 @@ final case class RebalanceListener(
partitions: java.util.Collection[TopicPartition]
): Unit = Unsafe.unsafe { implicit u =>
runtime.unsafe
.run(onAssigned(partitions.asScala.toSet, consumer))
.run(onAssigned(partitions.asScala.toSet))
.getOrThrowFiberFailure()
()
}
Expand All @@ -58,7 +57,7 @@ final case class RebalanceListener(
partitions: java.util.Collection[TopicPartition]
): Unit = Unsafe.unsafe { implicit u =>
runtime.unsafe
.run(onLost(partitions.asScala.toSet, consumer))
.run(onLost(partitions.asScala.toSet))
.getOrThrowFiberFailure()
()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private[consumer] final class Runloop private (
private[internal] def removeSubscription(subscription: Subscription): UIO[Unit] =
commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit

private def makeRebalanceListener(rc: RebalanceConsumer.Live): ConsumerRebalanceListener = {
private def makeRebalanceListener: ConsumerRebalanceListener = {
// All code in this block is called from the rebalance listener and therefore runs on the same-thread-runtime. This
// is because the Java kafka client requires us to invoke the consumer from the same thread that invoked the
// rebalance listener.
Expand Down Expand Up @@ -242,7 +242,7 @@ private[consumer] final class Runloop private (
)

(recordRebalanceRebalancingListener ++ settings.rebalanceListener.runOnExecutor(topLevelExecutor))
.toKafka(sameThreadRuntime, rc)
.toKafka(sameThreadRuntime)
}

/** This is the implementation behind the user facing api `Offset.commit`. */
Expand Down Expand Up @@ -674,12 +674,12 @@ private[consumer] final class Runloop private (
.attempt(c.unsubscribe())
.as(Chunk.empty)
case SubscriptionState.Subscribed(_, Subscription.Pattern(pattern)) =>
val rebalanceListener = makeRebalanceListener(RebalanceConsumer.Live(c))
val rebalanceListener = makeRebalanceListener
ZIO
.attempt(c.subscribe(pattern.pattern, rebalanceListener))
.as(Chunk.empty)
case SubscriptionState.Subscribed(_, Subscription.Topics(topics)) =>
val rebalanceListener = makeRebalanceListener(RebalanceConsumer.Live(c))
val rebalanceListener = makeRebalanceListener
ZIO
.attempt(c.subscribe(topics.asJava, rebalanceListener))
.as(Chunk.empty)
Expand Down

0 comments on commit 90fa193

Please sign in to comment.