diff --git a/faust/agents/agent.py b/faust/agents/agent.py index 1f7060a11..9ebd896af 100644 --- a/faust/agents/agent.py +++ b/faust/agents/agent.py @@ -661,7 +661,9 @@ async def _prepare_actor(self, aref: ActorRefT, beacon: NodeT) -> ActorRefT: else: # agent yields and is an AsyncIterator so we have to consume it. coro = self._slurp(aref, aiter(aref)) - task = asyncio.Task(self._execute_actor(coro, aref), loop=self.loop) + # Calling asyncio.Task is not proper usage of asyncio, + # we need to create the task directly from the loop + task = self.loop.create_task(self._execute_actor(coro, aref)) task._beacon = beacon # type: ignore aref.actor_task = task self._actors.add(aref) diff --git a/tests/unit/agents/test_agent.py b/tests/unit/agents/test_agent.py index bb31b75b8..e58d876ee 100644 --- a/tests/unit/agents/test_agent.py +++ b/tests/unit/agents/test_agent.py @@ -392,22 +392,23 @@ async def test_start_task(self, *, agent): assert ret is agent._prepare_actor.return_value @pytest.mark.asyncio - async def test_prepare_actor__AsyncIterable(self, *, agent): + async def test_prepare_actor__AsyncIterable(self, *, agent, monkeypatch): + async def mock_execute_actor(coro, aref): + await coro + + mock_beacon = Mock(name="beacon", autospec=Node) + mock_slurp = AsyncMock(name="slurp") + monkeypatch.setattr(agent, "_slurp", mock_slurp) + monkeypatch.setattr(agent, "_execute_actor", mock_execute_actor) aref = agent(index=0, active_partitions=None) - with patch("asyncio.Task") as Task: - agent._slurp = Mock(name="_slurp") - agent._execute_actor = Mock(name="_execute_actor") - beacon = Mock(name="beacon", autospec=Node) - ret = await agent._prepare_actor(aref, beacon) - agent._slurp.assert_called() - coro = agent._slurp() - agent._execute_actor.assert_called_once_with(coro, aref) - Task.assert_called_once_with(agent._execute_actor(), loop=agent.loop) - task = Task() - assert task._beacon is beacon - assert aref.actor_task is task - assert aref in agent._actors - assert ret is aref + ret = await agent._prepare_actor(aref, mock_beacon) + task = aref.actor_task + await task + mock_slurp.assert_awaited() + assert mock_slurp.await_args.args[0] is aref + assert task._beacon is mock_beacon + assert aref in agent._actors + assert ret is aref @pytest.mark.asyncio async def test_prepare_actor__Awaitable(self, *, agent2): @@ -428,22 +429,24 @@ async def test_prepare_actor__Awaitable(self, *, agent2): assert ret is aref @pytest.mark.asyncio - async def test_prepare_actor__Awaitable_with_multiple_topics(self, *, agent2): + async def test_prepare_actor__Awaitable_with_multiple_topics( + self, *, agent2, monkeypatch + ): aref = agent2(index=0, active_partitions=None) - asyncio.ensure_future(aref.it).cancel() # silence warning agent2.channel.topics = ["foo", "bar"] - with patch("asyncio.Task") as Task: - agent2._execute_actor = Mock(name="_execute_actor") - beacon = Mock(name="beacon", autospec=Node) - ret = await agent2._prepare_actor(aref, beacon) - coro = aref - agent2._execute_actor.assert_called_once_with(coro, aref) - Task.assert_called_once_with(agent2._execute_actor(), loop=agent2.loop) - task = Task() - assert task._beacon is beacon - assert aref.actor_task is task - assert aref in agent2._actors - assert ret is aref + mock_beacon = Mock(name="beacon", autospec=Node) + mock_slurp = AsyncMock(name="slurp") + mock_execute_actor = AsyncMock(name="execute_actor") + monkeypatch.setattr(agent2, "_slurp", mock_slurp) + monkeypatch.setattr(agent2, "_execute_actor", mock_execute_actor) + ret = await agent2._prepare_actor(aref, mock_beacon) + task = aref.actor_task + mock_slurp.assert_not_called() + mock_slurp.assert_not_awaited() + mock_execute_actor.assert_called_with(aref, aref) + assert task._beacon is mock_beacon + assert aref in agent2._actors + assert ret is aref @pytest.mark.asyncio async def test_prepare_actor__Awaitable_cannot_have_sinks(self, *, agent2):