Skip to content

Commit

Permalink
Add list_consumer_group_offsets()
Browse files Browse the repository at this point in the history
Support fetching the offsets of a consumer group.

Note: As far as I can tell (the Java code is a little inscrutable), the
Java AdminClient doesn't allow specifying the `coordinator_id` or the
`partitions`.

But I decided to include them because they provide a lot of additional
flexibility:

1. allowing users to specify the partitions allows this method to be used even for
older brokers that don't support the OffsetFetchRequest_v2

2. allowing users to specify the coordinator ID gives them a way to
bypass a network round trip. This method will frequently be used for
monitoring, and if you've got 1,000 consumer groups that are being
monitored once a minute, that's ~1.5M requests a day that are
unnecessarily duplicated as the coordinator doesn't change unless
there's an error.
  • Loading branch information
jeffwidman committed Nov 18, 2018
1 parent 5069088 commit b4bb1d5
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
77 changes: 76 additions & 1 deletion kafka/admin/kafka.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from __future__ import absolute_import

from collections import defaultdict
import copy
import logging
import socket

from kafka.vendor import six

from kafka.client_async import KafkaClient, selectors
import kafka.errors as Errors
from kafka.errors import (
Expand All @@ -12,8 +16,9 @@
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest)
from kafka.protocol.commit import GroupCoordinatorRequest
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.version import __version__

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -585,5 +590,75 @@ def list_consumer_groups(self):
# TODO this is completely broken, as it needs to send to the group coordinator
# return self._send(request)

def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
partitions=None):
"""Fetch Consumer Group Offsets.
Note:
This does not verify that the group_id or partitions actually exist
in the cluster.
As soon as any error is encountered, it is immediately raised.
:param group_id: The consumer group id name for which to fetch offsets.
:param group_coordinator_id: The node_id of the group's coordinator
broker. If set to None, will query the cluster to find the group
coordinator. Explicitly specifying this can be useful to prevent
that extra network round trip if you already know the group
coordinator. Default: None.
:param partitions: A list of TopicPartitions for which to fetch
offsets. On brokers >= 0.10.2, this can be set to None to fetch all
known offsets for the consumer group. Default: None.
:return dictionary: A dictionary with TopicPartition keys and
OffsetAndMetada values. Partitions that are not specified and for
which the group_id does not have a recorded offset are omitted. An
offset value of `-1` indicates the group_id has no offset for that
TopicPartition. A `-1` can only happen for partitions that are
explicitly specified.
"""
group_offsets_listing = {}
if group_coordinator_id is None:
group_coordinator_id = self._find_group_coordinator_id(group_id)
version = self._matching_api_version(OffsetFetchRequest)
if version <= 3:
if partitions is None:
if version <= 1:
raise ValueError(
"""OffsetFetchRequest_v{} requires specifying the
partitions for which to fetch offsets. Omitting the
partitions is only supported on brokers >= 0.10.2.
For details, see KIP-88.""".format(version))
topics_partitions = None
else:
# transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])]
topics_partitions_dict = defaultdict(set)
for topic, partition in partitions:
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
request = OffsetFetchRequest[version](group_id, topics_partitions)
response = self._send_request_to_node(group_coordinator_id, request)
if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
# optionally we could retry if error_type.retriable
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
# transform response into a dictionary with TopicPartition keys and
# OffsetAndMetada values--this is what the Java AdminClient returns
for topic, partitions in response.topics:
for partition, offset, metadata, error_code in partitions:
error_type = Errors.for_code(error_code)
if error_type is not Errors.NoError:
raise error_type(
"Unable to fetch offsets for group_id {}, topic {}, partition {}"
.format(group_id, topic, partition))
group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
else:
raise NotImplementedError(
"Support for OffsetFetch v{} has not yet been added to KafkaAdmin."
.format(version))
return group_offsets_listing

# delete groups protocol not yet implemented
# Note: send the request to the group's coordinator.
1 change: 1 addition & 0 deletions kafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
["topic", "partition", "leader", "replicas", "isr", "error"])

OffsetAndMetadata = namedtuple("OffsetAndMetadata",
# TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata)
["offset", "metadata"])

OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
Expand Down

0 comments on commit b4bb1d5

Please sign in to comment.