Skip to content

Commit

Permalink
WIP: PoC for fix of aio-libs#689
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Dec 2, 2020
1 parent 07e9bd3 commit b336d4f
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 22 deletions.
14 changes: 7 additions & 7 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
UnrecognizedBrokerVersion,
StaleMetadata)
from aiokafka.util import (
create_task, create_future, parse_kafka_version, get_running_loop
create_task, parse_kafka_version, get_running_loop
)


Expand Down Expand Up @@ -148,7 +148,7 @@ def __init__(self, *, loop=None, bootstrap_servers='localhost',
self._sync_task = None

self._md_update_fut = None
self._md_update_waiter = create_future()
self._md_update_waiter = loop.create_future()
self._get_conn_lock = asyncio.Lock()

def __repr__(self):
Expand Down Expand Up @@ -260,7 +260,7 @@ async def _md_synchronizer(self):

topics = self._topics
if self._md_update_fut is None:
self._md_update_fut = create_future()
self._md_update_fut = self._loop.create_future()
ret = await self._metadata_update(self.cluster, topics)
# If list of topics changed during metadata update we must update
# it again right away.
Expand All @@ -269,7 +269,7 @@ async def _md_synchronizer(self):
# Earlier this waiter was set before sending metadata_request,
# but that was to avoid topic list changes being unnoticed, which
# is handled explicitly now.
self._md_update_waiter = create_future()
self._md_update_waiter = self._loop.create_future()

self._md_update_fut.set_result(ret)
self._md_update_fut = None
Expand Down Expand Up @@ -344,7 +344,7 @@ def force_metadata_update(self):
# Wake up the `_md_synchronizer` task
if not self._md_update_waiter.done():
self._md_update_waiter.set_result(None)
self._md_update_fut = create_future()
self._md_update_fut = self._loop.create_future()
# Metadata will be updated in the background by syncronizer
return asyncio.shield(self._md_update_fut)

Expand All @@ -364,7 +364,7 @@ def add_topic(self, topic):
topic (str): topic to track
"""
if topic in self._topics:
res = create_future()
res = self._loop.create_future()
res.set_result(True)
else:
res = self.force_metadata_update()
Expand All @@ -381,7 +381,7 @@ def set_topics(self, topics):
if not topics or set(topics).difference(self._topics):
res = self.force_metadata_update()
else:
res = create_future()
res = self._loop.create_future()
res.set_result(True)
self._topics = set(topics)
return res
Expand Down
8 changes: 4 additions & 4 deletions aiokafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
GroupCoordinatorResponse_v0 as GroupCoordinatorResponse)

import aiokafka.errors as Errors
from aiokafka.util import create_future, create_task, get_running_loop
from aiokafka.util import create_task, get_running_loop

from aiokafka.abc import AbstractTokenProvider

Expand Down Expand Up @@ -203,7 +203,7 @@ def __del__(self, _warnings=warnings):

async def connect(self):
loop = self._loop
self._closed_fut = create_future()
self._closed_fut = self._loop.create_future()
if self._security_protocol in ["PLAINTEXT", "SASL_PLAINTEXT"]:
ssl = None
else:
Expand Down Expand Up @@ -436,7 +436,7 @@ def send(self, request, expect_response=True):

if not expect_response:
return self._writer.drain()
fut = create_future()
fut = self._loop.create_future()
self._requests.append((correlation_id, request.RESPONSE_TYPE, fut))
return asyncio.wait_for(fut, self._request_timeout)

Expand All @@ -458,7 +458,7 @@ def _send_sasl_token(self, payload, expect_response=True):
if not expect_response:
return self._writer.drain()

fut = create_future()
fut = self._loop.create_future()
self._requests.append((None, None, fut))
return asyncio.wait_for(fut, self._request_timeout)

Expand Down
7 changes: 4 additions & 3 deletions aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from aiokafka.record.memory_records import MemoryRecords
from aiokafka.record.control_record import ControlRecord, ABORT_MARKER
from aiokafka.structs import OffsetAndTimestamp, TopicPartition, ConsumerRecord
from aiokafka.util import create_future, create_task
from aiokafka.util import create_task

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -368,6 +368,7 @@ def __init__(
auto_offset_reset='latest',
isolation_level="read_uncommitted"):
self._client = client
self._loop = client._loop
self._key_deserializer = key_deserializer
self._value_deserializer = value_deserializer
self._fetch_min_bytes = fetch_min_bytes
Expand Down Expand Up @@ -440,7 +441,7 @@ def _create_fetch_waiter(self):
# Creating a fetch waiter is usually not that frequent of an operation,
# (get methods will return all data first, before a waiter is created)

fut = create_future()
fut = self._loop.create_future()
self._fetch_waiters.add(fut)
fut.add_done_callback(
lambda f, waiters=self._fetch_waiters: waiters.remove(f))
Expand Down Expand Up @@ -506,7 +507,7 @@ def on_done(fut, self=self):
assert assignment is not None and assignment.active

# Reset consuming signal future.
self._wait_consume_future = create_future()
self._wait_consume_future = self._loop.create_future()
# Determine what action to take per node
(fetch_requests, reset_requests, timeout, invalid_metadata,
resume_futures) = self._get_actions_per_node(assignment)
Expand Down
11 changes: 7 additions & 4 deletions aiokafka/producer/message_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from aiokafka.record.legacy_records import LegacyRecordBatchBuilder
from aiokafka.record.default_records import DefaultRecordBatchBuilder
from aiokafka.structs import RecordMetadata
from aiokafka.util import create_future
from aiokafka.util import create_future, get_running_loop


class BatchBuilder:
Expand Down Expand Up @@ -247,14 +247,17 @@ class MessageAccumulator:
"""
def __init__(
self, cluster, batch_size, compression_type, batch_ttl, *,
txn_manager=None):
txn_manager=None, loop=None):
if loop is None:
loop = get_running_loop()
self._loop = loop
self._batches = collections.defaultdict(collections.deque)
self._pending_batches = set([])
self._cluster = cluster
self._batch_size = batch_size
self._compression_type = compression_type
self._batch_ttl = batch_ttl
self._wait_data_future = create_future()
self._wait_data_future = loop.create_future()
self._closed = False
self._api_version = (0, 9)
self._txn_manager = txn_manager
Expand Down Expand Up @@ -413,7 +416,7 @@ def drain_by_nodes(self, ignore_nodes, muted_partitions=set()):
# task
if not self._wait_data_future.done():
self._wait_data_future.set_result(None)
self._wait_data_future = create_future()
self._wait_data_future = self._loop.create_future()

return nodes, unknown_leaders_exist

Expand Down
9 changes: 5 additions & 4 deletions aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ def __init__(self, *, loop=None, bootstrap_servers='localhost',
sasl_oauth_token_provider=None):
if loop is None:
loop = get_running_loop()
if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
self._loop = loop

if acks not in (0, 1, -1, 'all', _missing):
raise ValueError("Invalid ACKS parameter")
Expand Down Expand Up @@ -256,16 +259,14 @@ def __init__(self, *, loop=None, bootstrap_servers='localhost',
self._metadata = self.client.cluster
self._message_accumulator = MessageAccumulator(
self._metadata, max_batch_size, compression_attrs,
self._request_timeout_ms / 1000, txn_manager=self._txn_manager)
self._request_timeout_ms / 1000, txn_manager=self._txn_manager,
loop=loop)
self._sender = Sender(
self.client, acks=acks, txn_manager=self._txn_manager,
retry_backoff_ms=retry_backoff_ms, linger_ms=linger_ms,
message_accumulator=self._message_accumulator,
request_timeout_ms=request_timeout_ms)

self._loop = loop
if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
self._closed = False

# Warn if producer was not closed properly
Expand Down
14 changes: 14 additions & 0 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,20 @@ async def test_producer_send(self):
with self.assertRaises(ProducerClosed):
await producer.send(self.topic, b'value', key=b'KEY')

def test_create_producer_no_running_loop(self):
loop = asyncio.new_event_loop()
producer = AIOKafkaProducer(bootstrap_servers=self.hosts, loop=loop)
loop.run_until_complete(producer.start())
try:
future = loop.run_until_complete(
producer.send(self.topic, b'hello, Kafka!', partition=0))
resp = loop.run_until_complete(future)
self.assertEqual(resp.topic, self.topic)
self.assertTrue(resp.partition in (0, 1))
self.assertEqual(resp.offset, 0)
finally:
loop.run_until_complete(producer.stop())

@run_until_complete
async def test_producer_context_manager(self):
producer = AIOKafkaProducer(
Expand Down

0 comments on commit b336d4f

Please sign in to comment.