diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 75e9f98ad..1336167ad 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -484,6 +484,7 @@ private[consumer] final class Runloop private ( pollResult <- consumer.runloopAccess { c => for { + _ <- verifyAssignedStreamsMatchesAssignment(state.assignedStreams, c.assignment().asScala.toSet) resumeAndPauseCounts <- resumeAndPausePartitions(c, partitionsToFetch) (toResumeCount, toPauseCount) = resumeAndPauseCounts @@ -573,16 +574,7 @@ private[consumer] final class Runloop private ( ) ) // Ensure that all assigned partitions have a stream and no streams are present for unassigned streams - _ <- - ZIO - .logWarning( - s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${currentAssigned.mkString(",")}, streams: ${updatedAssignedStreams.map(_.tp).mkString(",")}" - ) - .when( - currentAssigned != updatedAssignedStreams - .map(_.tp) - .toSet || currentAssigned.size != updatedAssignedStreams.size - ) + _ <- verifyAssignedStreamsMatchesAssignment(updatedAssignedStreams, currentAssigned) } yield Runloop.PollResult( records = polledRecords, ignoreRecordsForTps = ignoreRecordsForTps, @@ -607,6 +599,16 @@ private[consumer] final class Runloop private ( ) } + private def verifyAssignedStreamsMatchesAssignment( + assignedStreams: Chunk[PartitionStreamControl], + currentAssigned: Set[TopicPartition] + ) = + ZIO + .logWarning( + s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${currentAssigned.mkString(",")}, streams: ${assignedStreams.map(_.tp).mkString(",")}" + ) + .when(currentAssigned != assignedStreams.map(_.tp).toSet || currentAssigned.size != assignedStreams.size) + /** * Check each stream to see if it exceeded its poll interval. If so, halt it. In addition, if any stream has exceeded * its poll interval, shutdown the consumer. @@ -715,7 +717,9 @@ private[consumer] final class Runloop private ( } }).tapBoth(cont.fail, _ => cont.succeed(()).unit) case RunloopCommand.RemoveAllSubscriptions => doChangeSubscription(SubscriptionState.NotSubscribed) - case RunloopCommand.StopAllStreams => + case RunloopCommand.StopAllStreams => + // End all partition streams and any pending requests. Keep the runloop running so that commits for in-flight + // records can still be processed. Keep assigned streams as is to be consistent with consumer assignment for { _ <- ZIO.logDebug("Stop all streams initiated") _ <- ZIO.foreachDiscard(state.assignedStreams)(_.end) @@ -953,6 +957,13 @@ object Runloop { private[internal] final case class State( pendingRequests: Chunk[RunloopCommand.Request], pendingCommits: Chunk[Runloop.Commit], + + /** + * Streams for partitions that are currently assigned to this consumer. + * + * Before and after each rebalance, it should be consistent with the partitions that are assigned according to the + * underlying KafkaConsumer. + */ assignedStreams: Chunk[PartitionStreamControl], subscriptionState: SubscriptionState ) {