Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix disabled aiokafka unit tests #610

Merged
merged 16 commits into from
Apr 1, 2024
6 changes: 3 additions & 3 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,14 +840,14 @@ def _verify_aiokafka_event_path(self, now: float, tp: TP) -> bool:
poll_at = None
aiotp_state = assignment.state_value(aiotp)
if aiotp_state and aiotp_state.timestamp:
poll_at = aiotp_state.timestamp / 1000
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this change was originally implemented in dcc421f by @patkivikram, could you expand on this?

poll_at = aiotp_state.timestamp
if poll_at is None:
if secs_since_started >= self.tp_fetch_request_timeout_secs:
# NO FETCH REQUEST SENT AT ALL SINCE WORKER START
self.log.error(
SLOW_PROCESSING_NO_FETCH_SINCE_START,
tp,
secs_since_started,
humanize_seconds_ago(secs_since_started),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidentally reverted this in master, ignore.

)
return True

Expand All @@ -857,7 +857,7 @@ def _verify_aiokafka_event_path(self, now: float, tp: TP) -> bool:
self.log.error(
SLOW_PROCESSING_NO_RECENT_FETCH,
tp,
secs_since_request,
humanize_seconds_ago(secs_since_request),
)
return True

Expand Down
97 changes: 68 additions & 29 deletions tests/unit/transport/drivers/test_aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from mode.utils import text
from mode.utils.futures import done_future
from mode.utils.times import humanize_seconds_ago
from opentracing.ext import tags

import faust
Expand Down Expand Up @@ -234,7 +235,7 @@ def mock_record(
serialized_value_size=40,
**kwargs,
):
return Mock(
return MagicMock(
name="record",
topic=topic,
partition=partition,
Expand Down Expand Up @@ -281,8 +282,8 @@ def start_span(operation_name=None, **kwargs):
return tracer

@pytest.fixture()
def _consumer(self):
return Mock(
def _consumer(self, now, cthread, tp):
_consumer = Mock(
name="AIOKafkaConsumer",
autospec=aiokafka.AIOKafkaConsumer,
start=AsyncMock(),
Expand All @@ -293,6 +294,17 @@ def _consumer(self):
_client=Mock(name="Client", close=AsyncMock()),
_coordinator=Mock(name="Coordinator", close=AsyncMock()),
)
_consumer.assignment.return_value = {tp}

(
_consumer._fetcher._subscriptions.subscription.assignment.state_value
).return_value = MagicMock(
assignment={tp},
timestamp=now,
highwater=1,
position=0,
)
return _consumer

@pytest.fixture()
def now(self):
Expand Down Expand Up @@ -465,18 +477,22 @@ def test_timed_out(self, *, cthread, _consumer, now, tp, logger):
)


@pytest.mark.skip("Needs fixing")
class Test_VEP_no_recent_fetch(Test_verify_event_path_base):
def test_recent_fetch(self, *, cthread, now, tp, logger):
self._set_last_response(now - 30.0)
self._set_last_request(now - 2.0)
assert cthread.verify_event_path(now, tp) is None
logger.error.assert_not_called()

def test_timed_out(self, *, cthread, now, tp, logger):
def test_timed_out(self, *, cthread, now, tp, logger, _consumer):
self._set_last_response(now - 30.0)
self._set_last_request(now - cthread.tp_fetch_request_timeout_secs * 2)
assert cthread.verify_event_path(now, tp) is None
assert (
cthread.verify_event_path(
now + cthread.tp_fetch_request_timeout_secs * 2, tp
)
is None
)
logger.error.assert_called_with(
mod.SLOW_PROCESSING_NO_RECENT_FETCH,
ANY,
Expand All @@ -503,29 +519,46 @@ def test_timed_out(self, *, cthread, now, tp, logger):
)


@pytest.mark.skip("Needs fixing")
class Test_VEP_no_highwater_since_start(Test_verify_event_path_base):
highwater = None

def test_no_monitor(self, *, app, cthread, now, tp, logger):
def test_no_monitor(self, *, app, cthread, now, tp, logger, _consumer):
self._set_last_request(now - 10.0)
self._set_last_response(now - 5.0)
self._set_started(now)
app.monitor = None
assert cthread.verify_event_path(now, tp) is None
logger.error.assert_not_called()

def test_just_started(self, *, cthread, now, tp, logger):
def test_just_started(self, *, cthread, now, tp, logger, _consumer):
self._set_last_request(now - 10.0)
self._set_last_response(now - 5.0)
self._set_started(now)
assert cthread.verify_event_path(now, tp) is None
logger.error.assert_not_called()

def test_timed_out(self, *, cthread, now, tp, logger):
def test_timed_out(self, *, cthread, now, tp, logger, _consumer):
self._set_last_request(now - 10.0)
self._set_last_response(now - 5.0)
self._set_started(now - cthread.tp_stream_timeout_secs * 2)
_consumer.assignment.return_value = {tp}
assignment = cthread.assignment()

assert assignment == {tp}
fetcher = _consumer._fetcher
(fetcher._subscriptions.subscription.assignment.state_value).return_value = (
MagicMock(
assignment=assignment,
timestamp=now,
highwater=None,
tp_stream_timeout_secs=cthread.tp_stream_timeout_secs,
tp_fetch_request_timeout_secs=cthread.tp_fetch_request_timeout_secs,
)
)
(
fetcher._subscriptions.subscription.assignment.state_value.timestamp
).return_value = now

assert cthread.verify_event_path(now, tp) is None
logger.error.assert_called_with(
mod.SLOW_PROCESSING_NO_HIGHWATER_SINCE_START,
Expand All @@ -534,7 +567,6 @@ def test_timed_out(self, *, cthread, now, tp, logger):
)


@pytest.mark.skip("Needs fixing")
class Test_VEP_stream_idle_no_highwater(Test_verify_event_path_base):
highwater = 10
committed_offset = 10
Expand All @@ -547,7 +579,6 @@ def test_highwater_same_as_offset(self, *, cthread, now, tp, logger):
logger.error.assert_not_called()


@pytest.mark.skip("Needs fixing")
class Test_VEP_stream_idle_highwater_no_acks(Test_verify_event_path_base):
acks_enabled = False

Expand All @@ -559,7 +590,6 @@ def test_no_acks(self, *, cthread, now, tp, logger):
logger.error.assert_not_called()


@pytest.mark.skip("Needs fixing")
class Test_VEP_stream_idle_highwater_same_has_acks_everything_OK(
Test_verify_event_path_base
):
Expand All @@ -572,6 +602,7 @@ def test_main(self, *, cthread, now, tp, logger):
self._set_last_request(now - 10.0)
self._set_last_response(now - 5.0)
self._set_started(now)

assert cthread.verify_event_path(now, tp) is None
logger.error.assert_not_called()

Expand Down Expand Up @@ -636,7 +667,6 @@ def test_inbound_timed_out(self, *, app, cthread, now, tp, logger):
)


@pytest.mark.skip("Needs fixing")
class Test_VEP_no_commit(Test_verify_event_path_base):
highwater = 20
committed_offset = 10
Expand Down Expand Up @@ -664,13 +694,13 @@ def test_timed_out_since_start(self, *, app, cthread, now, tp, logger):
expected_message = cthread._make_slow_processing_error(
mod.SLOW_PROCESSING_NO_COMMIT_SINCE_START,
[mod.SLOW_PROCESSING_CAUSE_COMMIT],
setting="broker_commit_livelock_soft_timeout",
current_value=app.conf.broker_commit_livelock_soft_timeout,
)
logger.error.assert_called_once_with(
expected_message,
tp,
ANY,
setting="broker_commit_livelock_soft_timeout",
current_value=app.conf.broker_commit_livelock_soft_timeout,
humanize_seconds_ago(cthread.tp_commit_timeout_secs * 2),
)

def test_timed_out_since_last(self, *, app, cthread, now, tp, logger):
Expand All @@ -681,13 +711,13 @@ def test_timed_out_since_last(self, *, app, cthread, now, tp, logger):
expected_message = cthread._make_slow_processing_error(
mod.SLOW_PROCESSING_NO_RECENT_COMMIT,
[mod.SLOW_PROCESSING_CAUSE_COMMIT],
setting="broker_commit_livelock_soft_timeout",
current_value=app.conf.broker_commit_livelock_soft_timeout,
)
logger.error.assert_called_once_with(
expected_message,
tp,
ANY,
setting="broker_commit_livelock_soft_timeout",
current_value=app.conf.broker_commit_livelock_soft_timeout,
humanize_seconds_ago(now - cthread.tp_commit_timeout_secs * 4),
)

def test_committing_fine(self, *, app, cthread, now, tp, logger):
Expand Down Expand Up @@ -1344,26 +1374,30 @@ def assert_new_producer(
max_batch_size=16384,
max_request_size=1000000,
request_timeout_ms=1200000,
metadata_max_age_ms=300000,
connections_max_idle_ms=540000,
security_protocol="PLAINTEXT",
**kwargs,
):
with patch("aiokafka.AIOKafkaProducer") as AIOKafkaProducer:
p = producer._new_producer()
assert p is AIOKafkaProducer.return_value
AIOKafkaProducer.assert_called_once_with(
acks=acks,
api_version=api_version,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
compression_type=compression_type,
acks=acks,
linger_ms=linger_ms,
max_batch_size=max_batch_size,
max_request_size=max_request_size,
request_timeout_ms=request_timeout_ms,
compression_type=compression_type,
security_protocol=security_protocol,
loop=producer.loop,
partitioner=producer.partitioner,
transactional_id=None,
api_version=api_version,
metadata_max_age_ms=metadata_max_age_ms,
connections_max_idle_ms=connections_max_idle_ms,
request_timeout_ms=request_timeout_ms,
**kwargs,
)

Expand Down Expand Up @@ -1433,7 +1467,6 @@ def test__settings_extra(self, *, producer, app):
app.in_transaction = False
assert producer._settings_extra() == {}

@pytest.mark.skip("fix me")
def test__new_producer(self, *, app):
producer = Producer(app.transport)
self.assert_new_producer(producer)
Expand All @@ -1442,8 +1475,8 @@ def test__new_producer(self, *, app):
"expected_args",
[
pytest.param(
{"api_version": "0.10"},
marks=pytest.mark.conf(producer_api_version="0.10"),
{"api_version": "auto"},
marks=pytest.mark.conf(producer_api_version="auto"),
),
pytest.param({"acks": -1}, marks=pytest.mark.conf(producer_acks="all")),
pytest.param(
Expand Down Expand Up @@ -1474,6 +1507,14 @@ def test__new_producer(self, *, app):
{"request_timeout_ms": 1234134000},
marks=pytest.mark.conf(producer_request_timeout=1234134),
),
pytest.param(
{"metadata_max_age_ms": 300000},
marks=pytest.mark.conf(metadata_max_age_ms=300000),
),
pytest.param(
{"connections_max_idle_ms": 540000},
marks=pytest.mark.conf(connections_max_idle_ms=540000),
),
pytest.param(
{
"security_protocol": "SASL_PLAINTEXT",
Expand All @@ -1490,7 +1531,6 @@ def test__new_producer(self, *, app):
),
],
)
@pytest.mark.skip("fix me")
def test__new_producer__using_settings(self, expected_args, *, app):
producer = Producer(app.transport)
self.assert_new_producer(producer, **expected_args)
Expand Down Expand Up @@ -1802,7 +1842,6 @@ async def test_on_start(
await threaded_producer.start()
await threaded_producer.stop()

@pytest.mark.skip("Needs fixing")
@pytest.mark.asyncio
async def test_on_thread_stop(
self,
Expand Down
Loading