Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use loop.create_task() for agent tasks #598

Merged
merged 9 commits into from
Jan 3, 2024
4 changes: 3 additions & 1 deletion faust/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,9 @@ async def _prepare_actor(self, aref: ActorRefT, beacon: NodeT) -> ActorRefT:
else:
# agent yields and is an AsyncIterator so we have to consume it.
coro = self._slurp(aref, aiter(aref))
task = asyncio.Task(self._execute_actor(coro, aref), loop=self.loop)
# Calling asyncio.Task is not proper usage of asyncio,
# we need to create the task directly from the loop
task = self.loop.create_task(self._execute_actor(coro, aref))
task._beacon = beacon # type: ignore
aref.actor_task = task
self._actors.add(aref)
Expand Down
61 changes: 32 additions & 29 deletions tests/unit/agents/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,22 +392,23 @@ async def test_start_task(self, *, agent):
assert ret is agent._prepare_actor.return_value

@pytest.mark.asyncio
async def test_prepare_actor__AsyncIterable(self, *, agent):
async def test_prepare_actor__AsyncIterable(self, *, agent, monkeypatch):
async def mock_execute_actor(coro, aref):
await coro

mock_beacon = Mock(name="beacon", autospec=Node)
mock_slurp = AsyncMock(name="slurp")
monkeypatch.setattr(agent, "_slurp", mock_slurp)
monkeypatch.setattr(agent, "_execute_actor", mock_execute_actor)
aref = agent(index=0, active_partitions=None)
with patch("asyncio.Task") as Task:
agent._slurp = Mock(name="_slurp")
agent._execute_actor = Mock(name="_execute_actor")
beacon = Mock(name="beacon", autospec=Node)
ret = await agent._prepare_actor(aref, beacon)
agent._slurp.assert_called()
coro = agent._slurp()
agent._execute_actor.assert_called_once_with(coro, aref)
Task.assert_called_once_with(agent._execute_actor(), loop=agent.loop)
task = Task()
assert task._beacon is beacon
assert aref.actor_task is task
assert aref in agent._actors
assert ret is aref
ret = await agent._prepare_actor(aref, mock_beacon)
task = aref.actor_task
await task
mock_slurp.assert_awaited()
assert mock_slurp.await_args.args[0] is aref
assert task._beacon is mock_beacon
assert aref in agent._actors
assert ret is aref

@pytest.mark.asyncio
async def test_prepare_actor__Awaitable(self, *, agent2):
Expand All @@ -428,22 +429,24 @@ async def test_prepare_actor__Awaitable(self, *, agent2):
assert ret is aref

@pytest.mark.asyncio
async def test_prepare_actor__Awaitable_with_multiple_topics(self, *, agent2):
async def test_prepare_actor__Awaitable_with_multiple_topics(
self, *, agent2, monkeypatch
):
aref = agent2(index=0, active_partitions=None)
asyncio.ensure_future(aref.it).cancel() # silence warning
agent2.channel.topics = ["foo", "bar"]
with patch("asyncio.Task") as Task:
agent2._execute_actor = Mock(name="_execute_actor")
beacon = Mock(name="beacon", autospec=Node)
ret = await agent2._prepare_actor(aref, beacon)
coro = aref
agent2._execute_actor.assert_called_once_with(coro, aref)
Task.assert_called_once_with(agent2._execute_actor(), loop=agent2.loop)
task = Task()
assert task._beacon is beacon
assert aref.actor_task is task
assert aref in agent2._actors
assert ret is aref
mock_beacon = Mock(name="beacon", autospec=Node)
mock_slurp = AsyncMock(name="slurp")
mock_execute_actor = AsyncMock(name="execute_actor")
monkeypatch.setattr(agent2, "_slurp", mock_slurp)
monkeypatch.setattr(agent2, "_execute_actor", mock_execute_actor)
ret = await agent2._prepare_actor(aref, mock_beacon)
task = aref.actor_task
mock_slurp.assert_not_called()
mock_slurp.assert_not_awaited()
mock_execute_actor.assert_called_with(aref, aref)
assert task._beacon is mock_beacon
assert aref in agent2._actors
assert ret is aref

@pytest.mark.asyncio
async def test_prepare_actor__Awaitable_cannot_have_sinks(self, *, agent2):
Expand Down
Loading