From cd44bdcf57b4aa6f675783ed1043b2d643b0ac54 Mon Sep 17 00:00:00 2001 From: Tobias Rauter Date: Fri, 28 Feb 2020 18:39:08 +0100 Subject: [PATCH] Support for static membership (KIP-345) and required member ids (KIP-394) KIP-345 adds static membership support. This means that a short period of inactivity of a consumer (e.g. code update) does not trigger a rebalance if the consumer registers again within session_timeout_ms. For now, only the JoinGroupRequest/Response and SyncGroupRequest/Response is adapted (seems to be sufficient to use the new interface). One can use this feature by setting the group_instance_id of AIOKafkaConsumer to a unique value per group member (and retain the value during restarts). Also, a new interface for partition assignors (AbstractStaticPartitionAssignor) has been added so assignors can use the group_instance_ids of all group members. This could be beneficial in containerized deployments, so each container could be assigned to its previous partitions after a rebalance occured. KIP-394 has been introduced to detect invalid JoinGroupRequests on the broker --- aiokafka/client.py | 2 + aiokafka/consumer/assignors.py | 25 ++++ aiokafka/consumer/consumer.py | 5 + aiokafka/consumer/group_coordinator.py | 154 ++++++++++++++++--------- aiokafka/errors.py | 9 ++ aiokafka/protocol/group.py | 76 ++++++++++++ 6 files changed, 217 insertions(+), 54 deletions(-) create mode 100644 aiokafka/consumer/assignors.py create mode 100644 aiokafka/protocol/group.py diff --git a/aiokafka/client.py b/aiokafka/client.py index 3ec5b99..397586f 100644 --- a/aiokafka/client.py +++ b/aiokafka/client.py @@ -13,6 +13,7 @@ from aiokafka.cluster import ClusterMetadata from aiokafka.protocol.coordination import FindCoordinatorRequest from aiokafka.protocol.produce import ProduceRequest +from aiokafka.protocol.group import SyncGroupRequest from aiokafka.errors import ( KafkaError, ConnectionError, @@ -566,6 +567,7 @@ def _check_api_version_response(self, response): # in descending order. As soon as we find one that works, return it test_cases = [ # format (, ) + ((2, 3, 0), SyncGroupRequest[0].API_KEY, 3), ((2, 1, 0), MetadataRequest[0].API_KEY, 7), ((1, 1, 0), FetchRequest[0].API_KEY, 7), ((1, 0, 0), MetadataRequest[0].API_KEY, 5), diff --git a/aiokafka/consumer/assignors.py b/aiokafka/consumer/assignors.py new file mode 100644 index 0000000..76e2572 --- /dev/null +++ b/aiokafka/consumer/assignors.py @@ -0,0 +1,25 @@ +from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor +import abc + +class AbstractStaticPartitionAssignor(AbstractPartitionAssignor): + """ + Abstract assignor implementation that also supports static assignments (KIP-345) + """ + + + @abc.abstractmethod + def assign(self, cluster, members, member_group_instance_ids): + """Perform group assignment given cluster metadata, member subscriptions + and group_instance_ids + + Arguments: + cluster (ClusterMetadata): metadata for use in assignment + members (dict of {member_id: MemberMetadata}): decoded metadata for + each member in the group. + mmember_group_instance_ids members (dict of {member_id: MemberMetadata}): decoded metadata for + each member in the group. + + Returns: + dict: {member_id: MemberAssignment} + """ + pass \ No newline at end of file diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index 1165bb2..59cc95b 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -63,6 +63,8 @@ class AIOKafkaConsumer(object): committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None + group_instance_id (str or None): name of the group instance ID used for + static membership (KIP-345) key_deserializer (callable): Any callable that takes a raw message key and returns a deserialized key. value_deserializer (callable, optional): Any callable that takes a @@ -219,6 +221,7 @@ def __init__(self, *topics, loop, bootstrap_servers='localhost', client_id='aiokafka-' + __version__, group_id=None, + group_instance_id=None, key_deserializer=None, value_deserializer=None, fetch_max_wait_ms=500, fetch_max_bytes=52428800, @@ -277,6 +280,7 @@ def __init__(self, *topics, loop, sasl_kerberos_domain_name=sasl_kerberos_domain_name) self._group_id = group_id + self._group_instance_id = group_instance_id self._heartbeat_interval_ms = heartbeat_interval_ms self._session_timeout_ms = session_timeout_ms self._retry_backoff_ms = retry_backoff_ms @@ -385,6 +389,7 @@ async def start(self): self._coordinator = GroupCoordinator( self._client, self._subscription, loop=self._loop, group_id=self._group_id, + group_instance_id=self._group_instance_id, heartbeat_interval_ms=self._heartbeat_interval_ms, session_timeout_ms=self._session_timeout_ms, retry_backoff_ms=self._retry_backoff_ms, diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index b03ea89..75176bf 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -6,11 +6,12 @@ import sys from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from aiokafka.consumer.assignors import AbstractStaticPartitionAssignor from kafka.coordinator.protocol import ConsumerProtocol from kafka.protocol.commit import ( OffsetCommitRequest_v2 as OffsetCommitRequest, OffsetFetchRequest_v1 as OffsetFetchRequest) -from kafka.protocol.group import ( +from aiokafka.protocol.group import ( HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest) import aiokafka.errors as Errors @@ -218,6 +219,7 @@ class GroupCoordinator(BaseCoordinator): def __init__(self, client, subscription, *, loop, group_id='aiokafka-default-group', + group_instance_id=None, session_timeout_ms=10000, heartbeat_interval_ms=3000, retry_backoff_ms=100, enable_auto_commit=True, auto_commit_interval_ms=5000, @@ -292,6 +294,8 @@ def __init__(self, client, subscription, *, loop, self._coordination_task = ensure_future( self._coordination_routine(), loop=loop) + self._group_instance_id = group_instance_id + def traced_from_parent_span(self, span=None, *, lazy=False, **extra_context): @@ -466,8 +470,10 @@ async def _perform_assignment( 'Invalid assignment protocol: %s' % assignment_strategy member_metadata = {} all_subscribed_topics = set() - for member_id, metadata_bytes in members: + group_instance_id_mapping = {} + for member_id, group_instance_id, metadata_bytes in members: metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) + group_instance_id_mapping = group_instance_id member_metadata[member_id] = metadata all_subscribed_topics.update(metadata.subscription) @@ -485,7 +491,12 @@ async def _perform_assignment( " with subscriptions %s", self.group_id, assignor.name, member_metadata) - assignments = assignor.assign(self._cluster, member_metadata) + assignments = None + if isinstance(assignor, AbstractStaticPartitionAssignor): + assignments = assignor.assign(self._cluster, member_metadata, + group_instance_id_mapping) + else: + assignments = assignor.assign(self._cluster, member_metadata) log.debug("Finished assignment for group %s: %s", self.group_id, assignments) @@ -822,7 +833,7 @@ async def REPLACE_WITH_MEMBER_ID(self, subscription, prev_assignment): self._start_heartbeat_task() return subscription.assignment return None - + def _start_heartbeat_task(self): if self._heartbeat_task is None: self._heartbeat_task = ensure_future( @@ -1316,45 +1327,65 @@ async def perform_group_join(self): group_protocol = (assignor.name, metadata) metadata_list.append(group_protocol) - if self._api_version < (0, 10, 1): - request = JoinGroupRequest[0]( - self.group_id, - self._session_timeout_ms, - self._coordinator.member_id, - ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) - elif self._api_version < (0, 11, 0): - request = JoinGroupRequest[1]( - self.group_id, - self._session_timeout_ms, - self._rebalance_timeout_ms, - self._coordinator.member_id, - ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) - else: - request = JoinGroupRequest[2]( - self.group_id, - self._session_timeout_ms, - self._rebalance_timeout_ms, - self._coordinator.member_id, - ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) + # for KIP-394 we may have to send a second join request + try_join = True + while(try_join): + try_join = False + + if self._api_version < (0, 10, 1): + request = JoinGroupRequest[0]( + self.group_id, + self._session_timeout_ms, + self._coordinator.member_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list) + elif self._api_version < (0, 11, 0): + request = JoinGroupRequest[1]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list) + elif self._api_version < (2, 3, 0): + request = JoinGroupRequest[2]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list) + else: + request = JoinGroupRequest[3]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + self._coordinator._group_instance_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list) + + # create the request for the coordinator + log.debug("Sending JoinGroup (%s) to coordinator %s", + request, self.coordinator_id) + try: + response = await T(self._coordinator._send_req)(request) + except Errors.KafkaError: + # Return right away. It's a connection error, so backoff will be + # handled by coordinator lookup + return None - # create the request for the coordinator - log.debug("Sending JoinGroup (%s) to coordinator %s", - request, self.coordinator_id) - try: - response = await T(self._coordinator._send_req)(request) - except Errors.KafkaError: - # Return right away. It's a connection error, so backoff will be - # handled by coordinator lookup - return None + if not self._subscription.active: + # Subscription changed. Ignore response and restart group join + return None + + error_type = Errors.for_code(response.error_code) + + if error_type is Errors.MemberIdRequired: + self._coordinator.member_id = response.member_id + try_join = True - if not self._subscription.active: - # Subscription changed. Ignore response and restart group join - return None - error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.debug("Join group response %s", response) self._coordinator.member_id = response.member_id @@ -1422,7 +1453,7 @@ async def perform_group_join(self): err = error_type() log.error( "Unexpected error in join group '%s' response: %s", - self.group_id, err) + self.group_id, error_type) raise Errors.KafkaError(repr(err)) return None @@ -1430,12 +1461,20 @@ async def perform_group_join(self): async def _on_join_follower(self): # send follower's sync group with an empty assignment T = self._coordinator.traced_from_parent_span(lazy=True) - version = 0 if self._api_version < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._coordinator.generation, - self._coordinator.member_id, - []) + if self._api_version < (2, 3, 0): + version = 0 if self._api_version < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + []) + else: + request = SyncGroupRequest[2]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + self._coordinator._group_instance_id, + []) log.debug( "Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) @@ -1468,13 +1507,20 @@ async def _on_join_leader(self, response): assignment = assignment.encode() assignment_req.append((member_id, assignment)) - version = 0 if self._api_version < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._coordinator.generation, - self._coordinator.member_id, - assignment_req) - + if self._api_version < (2, 3, 0): + version = 0 if self._api_version < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + assignment_req) + else: + request = SyncGroupRequest[2]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + self._coordinator._group_instance_id, + assignment_req) log.debug( "Sending leader SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) diff --git a/aiokafka/errors.py b/aiokafka/errors.py index 0d4b6e3..3c8ce41 100644 --- a/aiokafka/errors.py +++ b/aiokafka/errors.py @@ -412,6 +412,15 @@ class ListenerNotFound(BrokerResponseError): ) +class MemberIdRequired(BrokerResponseError): + errno = 79 + message = 'MEMBER_ID_REQUIRED' + description = ( + 'Consumer needs to have a valid member ' + 'id before actually entering group' + ) + + def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and \ diff --git a/aiokafka/protocol/group.py b/aiokafka/protocol/group.py new file mode 100644 index 0000000..80125df --- /dev/null +++ b/aiokafka/protocol/group.py @@ -0,0 +1,76 @@ +from __future__ import absolute_import + +from kafka.protocol.api import Request, Response +from kafka.protocol.struct import Struct +from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String +from kafka.protocol.group import * + +class JoinGroupResponse_v5(Response): + API_KEY = 11 + API_VERSION = 5 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('generation_id', Int32), + ('group_protocol', String('utf-8')), + ('leader_id', String('utf-8')), + ('member_id', String('utf-8')), + ('members', Array( + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('member_metadata', Bytes))) + ) + + + +class JoinGroupRequest_v5(Request): + API_KEY = 11 + API_VERSION = 5 + RESPONSE_TYPE = JoinGroupResponse_v5 + SCHEMA = Schema( + ('group', String('utf-8')), + ('session_timeout', Int32), + ('rebalance_timeout', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('protocol_type', String('utf-8')), + ('group_protocols', Array( + ('protocol_name', String('utf-8')), + ('protocol_metadata', Bytes))) + ) + UNKNOWN_MEMBER_ID = '' + + +JoinGroupRequest = [ + JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2, JoinGroupRequest_v5 +] +JoinGroupResponse = [ + JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupRequest_v2, JoinGroupResponse_v5 +] + + +class SyncGroupResponse_v3(Response): + API_KEY = 14 + API_VERSION = 3 + SCHEMA = SyncGroupResponse_v1.SCHEMA + + +class SyncGroupRequest_v3(Request): + API_KEY = 14 + API_VERSION = 3 + RESPONSE_TYPE = SyncGroupResponse_v3 + SCHEMA = Schema( + ('group', String('utf-8')), + ('generation_id', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('group_assignment', Array( + ('member_id', String('utf-8')), + ('member_metadata', Bytes))) + ) + + + +SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1, SyncGroupRequest_v3] +SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1, SyncGroupResponse_v3] +