Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition with entering client context and receiving QoS > 0 queued messages. #95

Closed
laundmo opened this issue Dec 10, 2021 · 14 comments

Comments

@laundmo
Copy link
Contributor

laundmo commented Dec 10, 2021

I was trying to receive messages published while the client is offline, which MQTT does if the following conditions are fulfilled:

  • Fixed client ID (as you've done)
  • Always connect with clean_session=False
  • Subscriptions must be made with QoS>0
  • Messages published must have QoS>0

Now, this works when i use the simple example:

import asyncio
from asyncio_mqtt import Client, MqttError
from conf import *

asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

async def main():
    async with Client(
        MQTTHOST,
        MQTTPORT,
        username=MQTTUSER,
        password=MQTTPWD,
        client_id=MQTTID,
        clean_session=False,
    ) as client:
        async with client.filtered_messages("ame_test/data/#") as messages:
            await client.subscribe("ame_test/data/#", qos=1)
            async for message in messages:
                print(message.payload.decode())

asyncio.run(main())

This is receiving the messages that were published with QoS > 0 while the script was not running.
The issue was that i wasn't getting the messages with my larger program using this library. After a LOT of debugging, trying things, and banging my head against the desk, i found out what the issue is: if the client context manager and the filtered_message or unfiltered_message context manager have any delay between them, the messages don't arrive. This seems to be because the paho client initially has a empty on_message handler.

Heres the exact same code as above, with a asyncio.sleep added to simulate the delay that was caused by other processes in my program. This does not recieve the queued QoS > 0 messages.

import asyncio
from asyncio_mqtt import Client, MqttError
from conf import *

asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

async def main():
    async with Client(
        MQTTHOST,
        MQTTPORT,
        username=MQTTUSER,
        password=MQTTPWD,
        client_id=MQTTID,
        clean_session=False,
    ) as client:
        await asyncio.sleep(1)
        async with client.filtered_messages("ame_test/data/#") as messages:
            await client.subscribe("ame_test/data/#", qos=1)
            async for message in messages:
                print(message.payload.decode())

asyncio.run(main())

Note: sometimes the first few queued messages dont arrive in the first version either. i might have 5 queued and only get 3 or 4.

@frederikaalund
Copy link
Collaborator

Hi laundmo. Thanks for opening this issue. Let me have a look. :)

A really appreciate the detailed and overall thorough bug report here. Good job. 👍

The issue was that i wasn't getting the messages with my larger program using this library. After a LOT of debugging, trying things, and banging my head against the desk, i found out what the issue is: if the client context manager and the filtered_message or unfiltered_message context manager have any delay between them, the messages don't arrive. This seems to be because the paho client initially has a empty on_message handler.

Sorry to hear about your frustration with asyncio-mqtt. I'm really glad that you kept going on and found the underlying issue. Let's try to solve it together.

The order of events (according to the current asyncio-mqtt recommendations):

  1. We create an asyncio_mqtt.Client object.
  2. We call Client.__aenter__. Internally, this calls paho's connect method.
  3. We call Client.filtered_messages (or unfiltered_messages). Internally, this sets the on_message callback in paho.

As you already figured out, this order of events is troublesome. Specifically, we may loose messages between (2) and (3). This is what you experienced in your application.

There are multiple solutions to this problem. I'll mention a couple below.

Change the asyncio-mqtt recommendation

In order to switch events (2) and (3) around, we change the asyncio-mqtt recommendation to something like this (adopted from your example code):

async def main():
    # Create client object (1)
    client = with Client(
        MQTTHOST,
        MQTTPORT,
        username=MQTTUSER,
        password=MQTTPWD,
        client_id=MQTTID,
        clean_session=False,
    )
    # Add message handler
    messages = client.filtered_messages("ame_test/data/#")
    # Register message handler (3) and connect the client (2)
    async with messages, client:
        await asyncio.sleep(1)  # We can now sleep for as long as we want since we fixed the race condition
        await client.subscribe("ame_test/data/#", qos=1)
        async for message in messages:
            print(message.payload.decode())

Pros: No change to the asyncio-mqtt API
Cons: Unintuitive?

Open issue in paho.mqtt

Basically, forward this issue upstream. Persuade paho.mqtt to add support for the (1), (2), (3) order of events. That is, paho buffers the messages up internally until we set on_message.

Pros: No change to asyncio-mqtt at all
Cons: This may take a long to materialize (if ever). Maybe the paho.mqtt developers don't want to support this use case.

Buffer up messages on asyncio-mqtt

Basically, set a dummy on_message handler on asyncio_mqtt.Client.__init__. Buffer up any messages that we receive before and release them when the user calls (un)filtered_messages

Pros: No change to the asyncio-mqtt API
Cons: This goes beyond the "a simple wrapper around paho.mqtt" idea. Paho has many flaws. This is just one of them. If we start to work around these flaws, the work never ends.


There are probably other solutions as well. I specifically tried to maneuver around breaking changes to the asyncio-mqtt API.

Personally, I lean to the "Change the asyncio-mqtt recommendation" solution. We can achieve that quickly. What do you think? Do you have any other suggestions?

Thanks again for the thorough and detailed bug report. :) Keep it up. 👍

~Frederik

@laundmo
Copy link
Contributor Author

laundmo commented Dec 13, 2021

Option 1: im not really sure about this, changing the recommended order like this makes the code less readable.

Option 2: i consider it very unlikely paho will change anything, since their callback based API intuitively leads to a structure where this won't be a issue: create client object, assign callbacks, connect.

Option 3: i can understand not wanting to queue messages due to the complexity involved.

my idea:

When a client instance is made and the context manager entered, don't call paho connect just yet. Wait until the user has set and started reading at least one message handler, then connect the paho client. That way, the handler has to already exist when the connection is made.

Pros:

  • No breaking change to the asyncio-mqtt API, only a minor one to add a argument to Client.__init__ which enables this feature.
  • Intuitive, when flag is set.

Cons:

  • Can only define a single handler before connection.

The con could be mitigated by allowing the user to manually connect once all the handlers are defined, which would require the argument to have a 3rd value to stop automatic connection and let the user connect, though at that point using a context manager might be moot and the user might as well use connect and disconnect manually.

@frederikaalund
Copy link
Collaborator

When a client instance is made and the context manager entered, don't call paho connect just yet. Wait until the user has set and started reading at least one message handler, then connect the paho client. That way, the handler has to already exist when the connection is made.

I like this idea. Though I don't like the con that you mention:

Can only define a single handler before connection.

That makes this keyword argument very special-purpose. I'd like to avoid that. It's a rabbit hole (difficult to maintain).

I like your mitigation idea:

The con could be mitigated by allowing the user to manually connect once all the handlers are defined, which would require the argument to have a 3rd value to stop automatic connection and let the user connect, though at that point using a context manager might be moot and the user might as well use connect and disconnect manually.

You propose something like this, right (correct me if I'm wrong):

async with with Client(..., skip_connect_on_init=True):  # Don't connect right away
    async with client.filtered_messages("ame_test/data/#") as messages:
      await client.subscribe("ame_test/data/#", qos=1)
      await client.connect()  # Connect manually after we set up all message handlers
      async for message in messages:
          print(message.payload.decode())

@laundmo
Copy link
Contributor Author

laundmo commented Jan 16, 2022

Yes, the last example should allow for recieving all QOS messages. If that's a change you're willing to accept, i could PR that. shouldn't be too large of a change.

@frederikaalund
Copy link
Collaborator

Hereby approved. 👍 I try to find the time to review your PR. If I don't respond right away, then just ping me.

Sorry about the month-long wait time on this issue.

@germanh1982
Copy link

Change the asyncio-mqtt recommendation

In order to switch events (2) and (3) around, we change the asyncio-mqtt recommendation to something like this (adopted from your example code):

async def main():
    # Create client object (1)
    client = with Client(
        MQTTHOST,
        MQTTPORT,
        username=MQTTUSER,
        password=MQTTPWD,
        client_id=MQTTID,
        clean_session=False,
    )
    # Add message handler
    messages = client.filtered_messages("ame_test/data/#")
    # Register message handler (3) and connect the client (2)
    async with messages, client:
        await asyncio.sleep(1)  # We can now sleep for as long as we want since we fixed the race condition
        await client.subscribe("ame_test/data/#", qos=1)
        async for message in messages:
            print(message.payload.decode())

Pros: No change to the asyncio-mqtt API Cons: Unintuitive?

Hi @frederikaalund

I'm having trouble making it work as per your suggestion.
The following code raises an exception:

from asyncio_mqtt import Client
import asyncio

async def coro():
    client = Client('localhost', client_id='cliente', clean_session=False)
    messages = client.filtered_messages("topic")
    async with messages, client:
        await client.subscribe("topic", qos=1)
        async for message in messages:
            print(message.payload.decode())

asyncio.run(coro())

The exception is:

    async for message in messages:
TypeError: 'async for' requires an object with __aiter__ method, got _AsyncGeneratorContextManager

Whatever I do, i can't seem to overcome this error. Can you point me what am I doing wrong?

Please excuse me for reviving this old thread, I thought this would be the most relevant place to post this question.
Last but not least, thank you for this library and your support
Cheers!
German

@germanh1982
Copy link

Sorry, I just found the other issue and this comment (#109 (comment)) and got it working without losing any messages:

from asyncio_mqtt import Client
import asyncio

async def coro():
    client = Client('localhost', client_id='cliente', clean_session=False)
    async with client.filtered_messages("topic") as messages:
        async with client:
            await client.subscribe("topic", qos=1)
            async for message in messages:
                print(message.payload.decode())

asyncio.run(coro())

Thanks again!
German

@YAGregor
Copy link
Contributor

I encounter the same problem, in these solutions I support that buffer up messages on asyncio-mqtt, anyway it's just a flaw of paho, not a bug, I (as a user) think that correct is more important than simplicity.

considering this, the API seems might should change, that seems means too much work and API break change might be painful.

@YAGregor
Copy link
Contributor

async def main():
    # Create client object (1)
    client = with Client(
        MQTTHOST,
        MQTTPORT,
        username=MQTTUSER,
        password=MQTTPWD,
        client_id=MQTTID,
        clean_session=False,
    )
    # Add message handler
    messages = client.filtered_messages("ame_test/data/#")
    # Register message handler (3) and connect the client (2)
    async with messages, client:
        await asyncio.sleep(1)  # We can now sleep for as long as we want since we fixed the race condition
        await client.subscribe("ame_test/data/#", qos=1)
        async for message in messages:
            print(message.payload.decode())

there is a small mistake,

async def main():
    # Create client object (1)
    client = with Client(
        MQTTHOST,
        MQTTPORT,
        username=MQTTUSER,
        password=MQTTPWD,
        client_id=MQTTID,
        clean_session=False,
    )
    # Register message handler (3) and connect the client (2)
    async with client.filtered_messages("ame_test/data/#") as messages, client:
        await asyncio.sleep(1)  # We can now sleep for as long as we want since we fixed the race condition
        await client.subscribe("ame_test/data/#", qos=1)
        async for message in messages:
            print(message.payload.decode())

this should fix it and it's less nested.

@frederikaalund
Copy link
Collaborator

there is a small mistake,

Thanks for chipping in! 👍 Though maybe I'm missing something but the two code snippets seem semantically identical to me. What's the intended "fix"?

@YAGregor
Copy link
Contributor

there is a small mistake,

Thanks for chipping in! +1 Though maybe I'm missing something but the two code snippets seem semantically identical to me. What's the intended "fix"?

just a syntax error, I tried the async with messages, client got exception TypeError: 'async for' requires an object with __aiter__ method, got _AsyncGeneratorContextManager

```python
async with messages as msg_iter: # messages is context manager and msg_iter is async iterable
    pass

must use as with client.messages() to get what __aenter__ return, that is different from with client because client.__aenter__ returns itself.

@frederikaalund
Copy link
Collaborator

just a syntax error, I tried the async with messages, client got exception TypeError: 'async for' requires an object with aiter method, got _AsyncGeneratorContextManager

That makes sense! I forgot about that level of indirection. Thanks for the correction. 👍

@empicano
Copy link
Owner

This is a very interesting discussion 👍 In 2.0.0 we removed filtered_messages(), unfiltered_messages(), and messages() in favor of a single client-wide queue. Incidentally, this means that we now set paho's _on_message callback already inside the client's __init__ method, which was one of the possible solutions discussed here.

This example does not lose message, even though we sleep before entering the messages generator:

import asyncio
import aiomqtt


async def main():
    client = aiomqtt.Client(
        hostname="test.mosquitto.org",
        identifier="example",
        clean_session=False,
    )
    # Initial subscription with QoS=1
    async with client:
        await client.subscribe("foo", qos=1)
    # Publish messages with QoS=1 and unrelated client
    async with aiomqtt.Client("test.mosquitto.org") as publisher:
        await publisher.publish("foo", 1, qos=1)
        await publisher.publish("foo", 2, qos=1)
        await publisher.publish("foo", 3, qos=1)
    # Receive messages with the initial client after reconnection and sleep
    async with client:
        await asyncio.sleep(2)
        async for message in client.messages:
            print(message.payload)


asyncio.run(main())

Let me know if that solves this issue! 😊

@empicano
Copy link
Owner

empicano commented Feb 4, 2024

If you notice that this issue is not solved after all, please reopen!

@empicano empicano closed this as completed Feb 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants