Skip to content

Commit

Permalink
Merge branch 'master' into improve-doc
Browse files Browse the repository at this point in the history
  • Loading branch information
tvoinarovskyi authored Dec 30, 2021
2 parents a828a55 + d3fa429 commit 36d6e47
Show file tree
Hide file tree
Showing 114 changed files with 141 additions and 3,567 deletions.
30 changes: 21 additions & 9 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -225,21 +225,21 @@ jobs:
matrix:
include:
- python: 3.9
kafka: "2.4.0"
scala: "2.12"
kafka: "2.8.1"
scala: "2.13"

# Older python versions against latest broker
- python: 3.6
kafka: "2.4.0"
scala: "2.12"
kafka: "2.8.1"
scala: "2.13"
- python: 3.7
kafka: "2.4.0"
scala: "2.12"
kafka: "2.8.1"
scala: "2.13"
- python: 3.8
kafka: "2.4.0"
scala: "2.12"
kafka: "2.8.1"
scala: "2.13"

# Older brokers against latest python version
# Older/newer brokers against latest python version
- python: 3.9
kafka: "0.9.0.1"
scala: "2.11"
Expand All @@ -261,6 +261,18 @@ jobs:
- python: 3.9
kafka: "2.3.1"
scala: "2.12"
- python: 3.9
kafka: "2.4.1"
scala: "2.12"
- python: 3.9
kafka: "2.5.1"
scala: "2.12"
- python: 3.9
kafka: "2.6.3"
scala: "2.12"
- python: 3.9
kafka: "2.7.2"
scala: "2.13"
fail-fast: false

steps:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,6 @@ aiokafka/record/_crecords/legacy_records.c
aiokafka/record/_crecords/memory_records.c
aiokafka/record/_crecords/cutil.c
aiokafka/record/_crecords/*.html

# pyenv
.python-version
1 change: 1 addition & 0 deletions CHANGES/731.doc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fix MyRebalancer on docs/consumer.rst
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Some simple testing tasks (sorry, UNIX only).

FLAGS?=--maxfail=3
SCALA_VERSION?=2.12
KAFKA_VERSION?=2.2.2
SCALA_VERSION?=2.13
KAFKA_VERSION?=2.8.1
DOCKER_IMAGE=aiolibs/kafka:$(SCALA_VERSION)_$(KAFKA_VERSION)
DIFF_BRANCH=origin/master
FORMATTED_AREAS=aiokafka/util.py aiokafka/structs.py
Expand Down
30 changes: 19 additions & 11 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
import time

from kafka.conn import collect_hosts
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.admin import DescribeAclsRequest_v2
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.produce import ProduceRequest

import aiokafka.errors as Errors
from aiokafka import __version__
from aiokafka.conn import create_conn, CloseReason
from aiokafka.cluster import ClusterMetadata
from aiokafka.protocol.coordination import FindCoordinatorRequest
from aiokafka.protocol.produce import ProduceRequest
from aiokafka.errors import (
KafkaError,
KafkaConnectionError,
Expand Down Expand Up @@ -581,13 +583,19 @@ def _check_api_version_response(self, response):
# in descending order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
((2, 3, 0), FetchRequest[0].API_KEY, 11),
((2, 1, 0), MetadataRequest[0].API_KEY, 7),
((1, 1, 0), FetchRequest[0].API_KEY, 7),
((1, 0, 0), MetadataRequest[0].API_KEY, 5),
((0, 11, 0), MetadataRequest[0].API_KEY, 4),
((0, 10, 2), OffsetFetchRequest[0].API_KEY, 2),
((0, 10, 1), MetadataRequest[0].API_KEY, 2),
# TODO Requires unreleased version of python-kafka
# ((2, 6, 0), DescribeClientQuotasRequest[0]),
((2, 5, 0), DescribeAclsRequest_v2),
((2, 4, 0), ProduceRequest[8]),
((2, 3, 0), FetchRequest[11]),
((2, 2, 0), OffsetRequest[5]),
((2, 1, 0), FetchRequest[10]),
((2, 0, 0), FetchRequest[8]),
((1, 1, 0), FetchRequest[7]),
((1, 0, 0), MetadataRequest[5]),
((0, 11, 0), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
((0, 10, 1), MetadataRequest[2]),
]

error_type = Errors.for_code(response.error_code)
Expand All @@ -597,8 +605,8 @@ def _check_api_version_response(self, response):
for api_key, _, max_version in response.api_versions
}
# Get the best match of test cases
for broker_version, api_key, version in test_cases:
if max_versions.get(api_key, -1) >= version:
for broker_version, struct in test_cases:
if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
return broker_version

# We know that ApiVersionResponse is only supported in 0.10+
Expand Down
12 changes: 8 additions & 4 deletions aiokafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,15 @@ async def connect(self):
self._idle_handle = loop.call_soon(
self._idle_check, weakref.ref(self))

if self._version_hint and self._version_hint >= (0, 10):
await self._do_version_lookup()
try:
if self._version_hint and self._version_hint >= (0, 10):
await self._do_version_lookup()

if self._security_protocol in ["SASL_SSL", "SASL_PLAINTEXT"]:
await self._do_sasl_handshake()
if self._security_protocol in ["SASL_SSL", "SASL_PLAINTEXT"]:
await self._do_sasl_handshake()
except: # noqa: E722
self.close()
raise

return reader, writer

Expand Down
2 changes: 1 addition & 1 deletion aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import async_timeout
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.fetch import FetchRequest

from aiokafka.protocol.fetch import FetchRequest
import aiokafka.errors as Errors
from aiokafka.errors import (
ConsumerStoppedError, RecordTooLargeError, KafkaTimeoutError)
Expand Down
3 changes: 2 additions & 1 deletion aiokafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import time

from kafka.protocol.produce import ProduceRequest

import aiokafka.errors as Errors
from aiokafka.client import ConnectionGroup, CoordinationType
from aiokafka.errors import (
Expand All @@ -14,7 +16,6 @@
OutOfOrderSequenceNumber, TopicAuthorizationFailedError,
GroupAuthorizationFailedError, TransactionalIdAuthorizationFailed,
OperationNotAttempted)
from aiokafka.protocol.produce import ProduceRequest
from aiokafka.protocol.transaction import (
InitProducerIdRequest, AddPartitionsToTxnRequest, EndTxnRequest,
AddOffsetsToTxnRequest, TxnOffsetCommitRequest
Expand Down
212 changes: 0 additions & 212 deletions aiokafka/protocol/fetch.py

This file was deleted.

Loading

0 comments on commit 36d6e47

Please sign in to comment.