From b4bb1d59f20d96a18a8a2688c20c6f9fce875778 Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Sat, 17 Nov 2018 02:53:08 -0800 Subject: [PATCH] Add list_consumer_group_offsets() Support fetching the offsets of a consumer group. Note: As far as I can tell (the Java code is a little inscrutable), the Java AdminClient doesn't allow specifying the `coordinator_id` or the `partitions`. But I decided to include them because they provide a lot of additional flexibility: 1. allowing users to specify the partitions allows this method to be used even for older brokers that don't support the OffsetFetchRequest_v2 2. allowing users to specify the coordinator ID gives them a way to bypass a network round trip. This method will frequently be used for monitoring, and if you've got 1,000 consumer groups that are being monitored once a minute, that's ~1.5M requests a day that are unnecessarily duplicated as the coordinator doesn't change unless there's an error. --- kafka/admin/kafka.py | 77 +++++++++++++++++++++++++++++++++++++++++++- kafka/structs.py | 1 + 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index befdd860a..224a660be 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -1,8 +1,12 @@ from __future__ import absolute_import +from collections import defaultdict import copy import logging import socket + +from kafka.vendor import six + from kafka.client_async import KafkaClient, selectors import kafka.errors as Errors from kafka.errors import ( @@ -12,8 +16,9 @@ from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest) -from kafka.protocol.commit import GroupCoordinatorRequest +from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest +from kafka.structs import TopicPartition, OffsetAndMetadata from kafka.version import __version__ log = logging.getLogger(__name__) @@ -585,5 +590,75 @@ def list_consumer_groups(self): # TODO this is completely broken, as it needs to send to the group coordinator # return self._send(request) + def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, + partitions=None): + """Fetch Consumer Group Offsets. + + Note: + This does not verify that the group_id or partitions actually exist + in the cluster. + + As soon as any error is encountered, it is immediately raised. + + :param group_id: The consumer group id name for which to fetch offsets. + :param group_coordinator_id: The node_id of the group's coordinator + broker. If set to None, will query the cluster to find the group + coordinator. Explicitly specifying this can be useful to prevent + that extra network round trip if you already know the group + coordinator. Default: None. + :param partitions: A list of TopicPartitions for which to fetch + offsets. On brokers >= 0.10.2, this can be set to None to fetch all + known offsets for the consumer group. Default: None. + :return dictionary: A dictionary with TopicPartition keys and + OffsetAndMetada values. Partitions that are not specified and for + which the group_id does not have a recorded offset are omitted. An + offset value of `-1` indicates the group_id has no offset for that + TopicPartition. A `-1` can only happen for partitions that are + explicitly specified. + """ + group_offsets_listing = {} + if group_coordinator_id is None: + group_coordinator_id = self._find_group_coordinator_id(group_id) + version = self._matching_api_version(OffsetFetchRequest) + if version <= 3: + if partitions is None: + if version <= 1: + raise ValueError( + """OffsetFetchRequest_v{} requires specifying the + partitions for which to fetch offsets. Omitting the + partitions is only supported on brokers >= 0.10.2. + For details, see KIP-88.""".format(version)) + topics_partitions = None + else: + # transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])] + topics_partitions_dict = defaultdict(set) + for topic, partition in partitions: + topics_partitions_dict[topic].add(partition) + topics_partitions = list(six.iteritems(topics_partitions_dict)) + request = OffsetFetchRequest[version](group_id, topics_partitions) + response = self._send_request_to_node(group_coordinator_id, request) + if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + # optionally we could retry if error_type.retriable + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + # transform response into a dictionary with TopicPartition keys and + # OffsetAndMetada values--this is what the Java AdminClient returns + for topic, partitions in response.topics: + for partition, offset, metadata, error_code in partitions: + error_type = Errors.for_code(error_code) + if error_type is not Errors.NoError: + raise error_type( + "Unable to fetch offsets for group_id {}, topic {}, partition {}" + .format(group_id, topic, partition)) + group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata) + else: + raise NotImplementedError( + "Support for OffsetFetch v{} has not yet been added to KafkaAdmin." + .format(version)) + return group_offsets_listing + # delete groups protocol not yet implemented # Note: send the request to the group's coordinator. diff --git a/kafka/structs.py b/kafka/structs.py index e15e92ed6..baacbcd43 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -72,6 +72,7 @@ ["topic", "partition", "leader", "replicas", "isr", "error"]) OffsetAndMetadata = namedtuple("OffsetAndMetadata", + # TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata) ["offset", "metadata"]) OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",