Skip to content

Commit

Permalink
Support for static membership (KIP-345) and required member ids (KIP-…
Browse files Browse the repository at this point in the history
…394)

  KIP-345 adds static membership support. This means that a short
  period of inactivity of a consumer (e.g. code update) does not trigger
  a rebalance if the consumer registers again within session_timeout_ms.
  For now, only the JoinGroupRequest/Response and SyncGroupRequest/Response
  is adapted (seems to be sufficient to use the new interface).

  One can use this feature by setting the group_instance_id of
  AIOKafkaConsumer to a unique value per group member (and retain the
  value during restarts).

  Also, a new interface for partition assignors (AbstractStaticPartitionAssignor)
  has been added so assignors can use the group_instance_ids of
  all group members. This could be beneficial in containerized deployments, so
  each container could be assigned to its previous partitions after a rebalance occured.

  KIP-394 has been introduced to detect invalid JoinGroupRequests on the broker
  • Loading branch information
Tobias Rauter committed Feb 28, 2020
1 parent 97e3bd2 commit cd44bdc
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 54 deletions.
2 changes: 2 additions & 0 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from aiokafka.cluster import ClusterMetadata
from aiokafka.protocol.coordination import FindCoordinatorRequest
from aiokafka.protocol.produce import ProduceRequest
from aiokafka.protocol.group import SyncGroupRequest
from aiokafka.errors import (
KafkaError,
ConnectionError,
Expand Down Expand Up @@ -566,6 +567,7 @@ def _check_api_version_response(self, response):
# in descending order. As soon as we find one that works, return it
test_cases = [
# format (<broker verion>, <needed struct>)
((2, 3, 0), SyncGroupRequest[0].API_KEY, 3),
((2, 1, 0), MetadataRequest[0].API_KEY, 7),
((1, 1, 0), FetchRequest[0].API_KEY, 7),
((1, 0, 0), MetadataRequest[0].API_KEY, 5),
Expand Down
25 changes: 25 additions & 0 deletions aiokafka/consumer/assignors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
import abc

class AbstractStaticPartitionAssignor(AbstractPartitionAssignor):
"""
Abstract assignor implementation that also supports static assignments (KIP-345)
"""


@abc.abstractmethod
def assign(self, cluster, members, member_group_instance_ids):
"""Perform group assignment given cluster metadata, member subscriptions
and group_instance_ids
Arguments:
cluster (ClusterMetadata): metadata for use in assignment
members (dict of {member_id: MemberMetadata}): decoded metadata for
each member in the group.
mmember_group_instance_ids members (dict of {member_id: MemberMetadata}): decoded metadata for
each member in the group.
Returns:
dict: {member_id: MemberAssignment}
"""
pass
5 changes: 5 additions & 0 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class AIOKafkaConsumer(object):
committing offsets. If None, auto-partition assignment (via
group coordinator) and offset commits are disabled.
Default: None
group_instance_id (str or None): name of the group instance ID used for
static membership (KIP-345)
key_deserializer (callable): Any callable that takes a
raw message key and returns a deserialized key.
value_deserializer (callable, optional): Any callable that takes a
Expand Down Expand Up @@ -219,6 +221,7 @@ def __init__(self, *topics, loop,
bootstrap_servers='localhost',
client_id='aiokafka-' + __version__,
group_id=None,
group_instance_id=None,
key_deserializer=None, value_deserializer=None,
fetch_max_wait_ms=500,
fetch_max_bytes=52428800,
Expand Down Expand Up @@ -277,6 +280,7 @@ def __init__(self, *topics, loop,
sasl_kerberos_domain_name=sasl_kerberos_domain_name)

self._group_id = group_id
self._group_instance_id = group_instance_id
self._heartbeat_interval_ms = heartbeat_interval_ms
self._session_timeout_ms = session_timeout_ms
self._retry_backoff_ms = retry_backoff_ms
Expand Down Expand Up @@ -385,6 +389,7 @@ async def start(self):
self._coordinator = GroupCoordinator(
self._client, self._subscription, loop=self._loop,
group_id=self._group_id,
group_instance_id=self._group_instance_id,
heartbeat_interval_ms=self._heartbeat_interval_ms,
session_timeout_ms=self._session_timeout_ms,
retry_backoff_ms=self._retry_backoff_ms,
Expand Down
154 changes: 100 additions & 54 deletions aiokafka/consumer/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import sys

from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from aiokafka.consumer.assignors import AbstractStaticPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocol
from kafka.protocol.commit import (
OffsetCommitRequest_v2 as OffsetCommitRequest,
OffsetFetchRequest_v1 as OffsetFetchRequest)
from kafka.protocol.group import (
from aiokafka.protocol.group import (
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest)

import aiokafka.errors as Errors
Expand Down Expand Up @@ -218,6 +219,7 @@ class GroupCoordinator(BaseCoordinator):

def __init__(self, client, subscription, *, loop,
group_id='aiokafka-default-group',
group_instance_id=None,
session_timeout_ms=10000, heartbeat_interval_ms=3000,
retry_backoff_ms=100,
enable_auto_commit=True, auto_commit_interval_ms=5000,
Expand Down Expand Up @@ -292,6 +294,8 @@ def __init__(self, client, subscription, *, loop,
self._coordination_task = ensure_future(
self._coordination_routine(), loop=loop)

self._group_instance_id = group_instance_id

def traced_from_parent_span(self, span=None, *,
lazy=False,
**extra_context):
Expand Down Expand Up @@ -466,8 +470,10 @@ async def _perform_assignment(
'Invalid assignment protocol: %s' % assignment_strategy
member_metadata = {}
all_subscribed_topics = set()
for member_id, metadata_bytes in members:
group_instance_id_mapping = {}
for member_id, group_instance_id, metadata_bytes in members:
metadata = ConsumerProtocol.METADATA.decode(metadata_bytes)
group_instance_id_mapping = group_instance_id
member_metadata[member_id] = metadata
all_subscribed_topics.update(metadata.subscription)

Expand All @@ -485,7 +491,12 @@ async def _perform_assignment(
" with subscriptions %s", self.group_id, assignor.name,
member_metadata)

assignments = assignor.assign(self._cluster, member_metadata)
assignments = None
if isinstance(assignor, AbstractStaticPartitionAssignor):
assignments = assignor.assign(self._cluster, member_metadata,
group_instance_id_mapping)
else:
assignments = assignor.assign(self._cluster, member_metadata)
log.debug("Finished assignment for group %s: %s",
self.group_id, assignments)

Expand Down Expand Up @@ -822,7 +833,7 @@ async def REPLACE_WITH_MEMBER_ID(self, subscription, prev_assignment):
self._start_heartbeat_task()
return subscription.assignment
return None

def _start_heartbeat_task(self):
if self._heartbeat_task is None:
self._heartbeat_task = ensure_future(
Expand Down Expand Up @@ -1316,45 +1327,65 @@ async def perform_group_join(self):
group_protocol = (assignor.name, metadata)
metadata_list.append(group_protocol)

if self._api_version < (0, 10, 1):
request = JoinGroupRequest[0](
self.group_id,
self._session_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
elif self._api_version < (0, 11, 0):
request = JoinGroupRequest[1](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
else:
request = JoinGroupRequest[2](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
# for KIP-394 we may have to send a second join request
try_join = True
while(try_join):
try_join = False

if self._api_version < (0, 10, 1):
request = JoinGroupRequest[0](
self.group_id,
self._session_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
elif self._api_version < (0, 11, 0):
request = JoinGroupRequest[1](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
elif self._api_version < (2, 3, 0):
request = JoinGroupRequest[2](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
else:
request = JoinGroupRequest[3](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
self._coordinator._group_instance_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)

# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s",
request, self.coordinator_id)
try:
response = await T(self._coordinator._send_req)(request)
except Errors.KafkaError:
# Return right away. It's a connection error, so backoff will be
# handled by coordinator lookup
return None

# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s",
request, self.coordinator_id)
try:
response = await T(self._coordinator._send_req)(request)
except Errors.KafkaError:
# Return right away. It's a connection error, so backoff will be
# handled by coordinator lookup
return None
if not self._subscription.active:
# Subscription changed. Ignore response and restart group join
return None

error_type = Errors.for_code(response.error_code)

if error_type is Errors.MemberIdRequired:
self._coordinator.member_id = response.member_id
try_join = True

if not self._subscription.active:
# Subscription changed. Ignore response and restart group join
return None

error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Join group response %s", response)
self._coordinator.member_id = response.member_id
Expand Down Expand Up @@ -1422,20 +1453,28 @@ async def perform_group_join(self):
err = error_type()
log.error(
"Unexpected error in join group '%s' response: %s",
self.group_id, err)
self.group_id, error_type)
raise Errors.KafkaError(repr(err))

return None

async def _on_join_follower(self):
# send follower's sync group with an empty assignment
T = self._coordinator.traced_from_parent_span(lazy=True)
version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
[])
if self._api_version < (2, 3, 0):
version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
[])
else:
request = SyncGroupRequest[2](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
self._coordinator._group_instance_id,
[])
log.debug(
"Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
Expand Down Expand Up @@ -1468,13 +1507,20 @@ async def _on_join_leader(self, response):
assignment = assignment.encode()
assignment_req.append((member_id, assignment))

version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
assignment_req)

if self._api_version < (2, 3, 0):
version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
assignment_req)
else:
request = SyncGroupRequest[2](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
self._coordinator._group_instance_id,
assignment_req)
log.debug(
"Sending leader SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
Expand Down
9 changes: 9 additions & 0 deletions aiokafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,15 @@ class ListenerNotFound(BrokerResponseError):
)


class MemberIdRequired(BrokerResponseError):
errno = 79
message = 'MEMBER_ID_REQUIRED'
description = (
'Consumer needs to have a valid member '
'id before actually entering group'
)


def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and \
Expand Down
Loading

0 comments on commit cd44bdc

Please sign in to comment.