Skip to content

Commit

Permalink
FIX Record instances deserialize properly when returned by agent.ask (#…
Browse files Browse the repository at this point in the history
…152)

* FIX Record instances deserialize properly when returned by agent.ask

Faust uses an internal model store to keep track of classes deriving from ```Record```.
When sent via Kafka those classes are transparently reconstructed from the deserialized data.
The corresponding logic resides in ```faust.models.base.maybe_model``` which must be used to wrap
the reply value of ```faust.agents.replies.ReplyConsumer._drain_replies```.

* fixed linting

Co-authored-by: tariq <tariq@attariq.de>
  • Loading branch information
tarbaig and tariq authored May 28, 2021
1 parent bef8710 commit df0856f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
4 changes: 3 additions & 1 deletion faust/agents/replies.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

__all__ = ["ReplyPromise", "BarrierState", "ReplyConsumer"]

from ..models import maybe_model


class ReplyTuple(NamedTuple):
correlation_id: str
Expand Down Expand Up @@ -181,7 +183,7 @@ async def _start_fetcher(self, topic_name: str) -> None:
async def _drain_replies(self, channel: ChannelT) -> None:
async for reply in channel.stream():
for promise in self._waiting[reply.correlation_id]:
promise.fulfill(reply.correlation_id, reply.value)
promise.fulfill(reply.correlation_id, maybe_model(reply.value))

def _reply_topic(self, topic: str) -> TopicT:
return self.app.topic(
Expand Down
19 changes: 19 additions & 0 deletions tests/unit/agents/test_replies.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import asyncio
import json

import pytest
from mode.utils.mocks import AsyncMock, Mock

from faust import Record
from faust.agents.models import ReqRepResponse
from faust.agents.replies import BarrierState, ReplyConsumer, ReplyPromise


class Account(Record, serializer="json"):
id: str
name: str
active: bool = True


def test_ReplyPromise():
r = ReplyPromise(reply_to="rt", correlation_id="id1")
assert r.reply_to == "rt"
Expand Down Expand Up @@ -207,24 +215,35 @@ async def test_start_fetcher(self, *, c):

@pytest.mark.asyncio
async def test_drain_replies(self, *, c):
an_account = Account(id="1", name="aName", active=False)
responses = [
ReqRepResponse(key="key1", value="value1", correlation_id="id1"),
ReqRepResponse(key="key2", value="value2", correlation_id="id2"),
ReqRepResponse.from_data(
json.loads(
ReqRepResponse(
key="key3", value=an_account, correlation_id="id3"
).dumps()
)
),
]
channel = Mock(
stream=Mock(return_value=self._response_stream(responses)),
)
p1 = Mock()
p2 = Mock()
p3 = Mock()
p4 = Mock()
c._waiting["id1"] = {p1, p2}
c._waiting["id2"] = {p3}
c._waiting["id3"] = {p4}

await c._drain_replies(channel)

p1.fulfill.assert_called_once_with("id1", "value1")
p2.fulfill.assert_called_once_with("id1", "value1")
p3.fulfill.assert_called_once_with("id2", "value2")
p4.fulfill.assert_called_once_with("id3", an_account)

async def _response_stream(self, responses):
for response in responses:
Expand Down

0 comments on commit df0856f

Please sign in to comment.