Skip to content

Commit

Permalink
Add assertion before poll, add some documenting comments
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Nov 9, 2024
1 parent 9901b16 commit a9925d5
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ private[consumer] final class Runloop private (
pollResult <-
consumer.runloopAccess { c =>
for {
_ <- verifyAssignedStreamsMatchesAssignment(state.assignedStreams, c.assignment().asScala.toSet)
resumeAndPauseCounts <- resumeAndPausePartitions(c, partitionsToFetch)
(toResumeCount, toPauseCount) = resumeAndPauseCounts

Expand Down Expand Up @@ -573,16 +574,7 @@ private[consumer] final class Runloop private (
)
)
// Ensure that all assigned partitions have a stream and no streams are present for unassigned streams
_ <-
ZIO
.logWarning(
s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${currentAssigned.mkString(",")}, streams: ${updatedAssignedStreams.map(_.tp).mkString(",")}"
)
.when(
currentAssigned != updatedAssignedStreams
.map(_.tp)
.toSet || currentAssigned.size != updatedAssignedStreams.size
)
_ <- verifyAssignedStreamsMatchesAssignment(updatedAssignedStreams, currentAssigned)
} yield Runloop.PollResult(
records = polledRecords,
ignoreRecordsForTps = ignoreRecordsForTps,
Expand All @@ -607,6 +599,16 @@ private[consumer] final class Runloop private (
)
}

private def verifyAssignedStreamsMatchesAssignment(
assignedStreams: Chunk[PartitionStreamControl],
currentAssigned: Set[TopicPartition]
) =
ZIO
.logWarning(
s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${currentAssigned.mkString(",")}, streams: ${assignedStreams.map(_.tp).mkString(",")}"
)
.when(currentAssigned != assignedStreams.map(_.tp).toSet || currentAssigned.size != assignedStreams.size)

/**
* Check each stream to see if it exceeded its poll interval. If so, halt it. In addition, if any stream has exceeded
* its poll interval, shutdown the consumer.
Expand Down Expand Up @@ -715,7 +717,9 @@ private[consumer] final class Runloop private (
}
}).tapBoth(cont.fail, _ => cont.succeed(()).unit)
case RunloopCommand.RemoveAllSubscriptions => doChangeSubscription(SubscriptionState.NotSubscribed)
case RunloopCommand.StopAllStreams =>
case RunloopCommand.StopAllStreams =>
// End all partition streams and any pending requests. Keep the runloop running so that commits for in-flight
// records can still be processed. Keep assigned streams as is to be consistent with consumer assignment
for {
_ <- ZIO.logDebug("Stop all streams initiated")
_ <- ZIO.foreachDiscard(state.assignedStreams)(_.end)
Expand Down Expand Up @@ -953,6 +957,13 @@ object Runloop {
private[internal] final case class State(
pendingRequests: Chunk[RunloopCommand.Request],
pendingCommits: Chunk[Runloop.Commit],

/**
* Streams for partitions that are currently assigned to this consumer.
*
* Before and after each rebalance, it should be consistent with the partitions that are assigned according to the
* underlying KafkaConsumer.
*/
assignedStreams: Chunk[PartitionStreamControl],
subscriptionState: SubscriptionState
) {
Expand Down

0 comments on commit a9925d5

Please sign in to comment.