-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Race condition with entering client context and receiving QoS > 0 queued messages. #95
Comments
Hi laundmo. Thanks for opening this issue. Let me have a look. :) A really appreciate the detailed and overall thorough bug report here. Good job. 👍
Sorry to hear about your frustration with asyncio-mqtt. I'm really glad that you kept going on and found the underlying issue. Let's try to solve it together. The order of events (according to the current asyncio-mqtt recommendations):
As you already figured out, this order of events is troublesome. Specifically, we may loose messages between (2) and (3). This is what you experienced in your application. There are multiple solutions to this problem. I'll mention a couple below. Change the asyncio-mqtt recommendationIn order to switch events (2) and (3) around, we change the asyncio-mqtt recommendation to something like this (adopted from your example code): async def main():
# Create client object (1)
client = with Client(
MQTTHOST,
MQTTPORT,
username=MQTTUSER,
password=MQTTPWD,
client_id=MQTTID,
clean_session=False,
)
# Add message handler
messages = client.filtered_messages("ame_test/data/#")
# Register message handler (3) and connect the client (2)
async with messages, client:
await asyncio.sleep(1) # We can now sleep for as long as we want since we fixed the race condition
await client.subscribe("ame_test/data/#", qos=1)
async for message in messages:
print(message.payload.decode()) Pros: No change to the asyncio-mqtt API Open issue in paho.mqttBasically, forward this issue upstream. Persuade paho.mqtt to add support for the (1), (2), (3) order of events. That is, paho buffers the messages up internally until we set Pros: No change to asyncio-mqtt at all Buffer up messages on asyncio-mqttBasically, set a dummy Pros: No change to the asyncio-mqtt API There are probably other solutions as well. I specifically tried to maneuver around breaking changes to the asyncio-mqtt API. Personally, I lean to the "Change the asyncio-mqtt recommendation" solution. We can achieve that quickly. What do you think? Do you have any other suggestions? Thanks again for the thorough and detailed bug report. :) Keep it up. 👍 ~Frederik |
Option 1: im not really sure about this, changing the recommended order like this makes the code less readable. Option 2: i consider it very unlikely paho will change anything, since their callback based API intuitively leads to a structure where this won't be a issue: create client object, assign callbacks, connect. Option 3: i can understand not wanting to queue messages due to the complexity involved. my idea: When a client instance is made and the context manager entered, don't call paho connect just yet. Wait until the user has set and started reading at least one message handler, then connect the paho client. That way, the handler has to already exist when the connection is made. Pros:
Cons:
The con could be mitigated by allowing the user to manually connect once all the handlers are defined, which would require the argument to have a 3rd value to stop automatic connection and let the user connect, though at that point using a context manager might be moot and the user might as well use connect and disconnect manually. |
I like this idea. Though I don't like the con that you mention:
That makes this keyword argument very special-purpose. I'd like to avoid that. It's a rabbit hole (difficult to maintain). I like your mitigation idea:
You propose something like this, right (correct me if I'm wrong): async with with Client(..., skip_connect_on_init=True): # Don't connect right away
async with client.filtered_messages("ame_test/data/#") as messages:
await client.subscribe("ame_test/data/#", qos=1)
await client.connect() # Connect manually after we set up all message handlers
async for message in messages:
print(message.payload.decode()) |
Yes, the last example should allow for recieving all QOS messages. If that's a change you're willing to accept, i could PR that. shouldn't be too large of a change. |
Hereby approved. 👍 I try to find the time to review your PR. If I don't respond right away, then just ping me. Sorry about the month-long wait time on this issue. |
I'm having trouble making it work as per your suggestion. from asyncio_mqtt import Client
import asyncio
async def coro():
client = Client('localhost', client_id='cliente', clean_session=False)
messages = client.filtered_messages("topic")
async with messages, client:
await client.subscribe("topic", qos=1)
async for message in messages:
print(message.payload.decode())
asyncio.run(coro()) The exception is:
Whatever I do, i can't seem to overcome this error. Can you point me what am I doing wrong? Please excuse me for reviving this old thread, I thought this would be the most relevant place to post this question. |
Sorry, I just found the other issue and this comment (#109 (comment)) and got it working without losing any messages: from asyncio_mqtt import Client
import asyncio
async def coro():
client = Client('localhost', client_id='cliente', clean_session=False)
async with client.filtered_messages("topic") as messages:
async with client:
await client.subscribe("topic", qos=1)
async for message in messages:
print(message.payload.decode())
asyncio.run(coro()) Thanks again! |
I encounter the same problem, in these solutions I support that buffer up messages on asyncio-mqtt, anyway it's just a flaw of paho, not a bug, I (as a user) think that correct is more important than simplicity. considering this, the API seems might should change, that seems means too much work and API break change might be painful. |
async def main():
# Create client object (1)
client = with Client(
MQTTHOST,
MQTTPORT,
username=MQTTUSER,
password=MQTTPWD,
client_id=MQTTID,
clean_session=False,
)
# Add message handler
messages = client.filtered_messages("ame_test/data/#")
# Register message handler (3) and connect the client (2)
async with messages, client:
await asyncio.sleep(1) # We can now sleep for as long as we want since we fixed the race condition
await client.subscribe("ame_test/data/#", qos=1)
async for message in messages:
print(message.payload.decode()) there is a small mistake, async def main():
# Create client object (1)
client = with Client(
MQTTHOST,
MQTTPORT,
username=MQTTUSER,
password=MQTTPWD,
client_id=MQTTID,
clean_session=False,
)
# Register message handler (3) and connect the client (2)
async with client.filtered_messages("ame_test/data/#") as messages, client:
await asyncio.sleep(1) # We can now sleep for as long as we want since we fixed the race condition
await client.subscribe("ame_test/data/#", qos=1)
async for message in messages:
print(message.payload.decode()) this should fix it and it's less nested. |
Thanks for chipping in! 👍 Though maybe I'm missing something but the two code snippets seem semantically identical to me. What's the intended "fix"? |
just a syntax error, I tried the ```python
async with messages as msg_iter: # messages is context manager and msg_iter is async iterable
pass must use |
That makes sense! I forgot about that level of indirection. Thanks for the correction. 👍 |
This is a very interesting discussion 👍 In 2.0.0 we removed This example does not lose message, even though we sleep before entering the import asyncio
import aiomqtt
async def main():
client = aiomqtt.Client(
hostname="test.mosquitto.org",
identifier="example",
clean_session=False,
)
# Initial subscription with QoS=1
async with client:
await client.subscribe("foo", qos=1)
# Publish messages with QoS=1 and unrelated client
async with aiomqtt.Client("test.mosquitto.org") as publisher:
await publisher.publish("foo", 1, qos=1)
await publisher.publish("foo", 2, qos=1)
await publisher.publish("foo", 3, qos=1)
# Receive messages with the initial client after reconnection and sleep
async with client:
await asyncio.sleep(2)
async for message in client.messages:
print(message.payload)
asyncio.run(main()) Let me know if that solves this issue! 😊 |
If you notice that this issue is not solved after all, please reopen! |
I was trying to receive messages published while the client is offline, which MQTT does if the following conditions are fulfilled:
Now, this works when i use the simple example:
This is receiving the messages that were published with QoS > 0 while the script was not running.
The issue was that i wasn't getting the messages with my larger program using this library. After a LOT of debugging, trying things, and banging my head against the desk, i found out what the issue is: if the client context manager and the filtered_message or unfiltered_message context manager have any delay between them, the messages don't arrive. This seems to be because the paho client initially has a empty on_message handler.
Heres the exact same code as above, with a asyncio.sleep added to simulate the delay that was caused by other processes in my program. This does not recieve the queued QoS > 0 messages.
Note: sometimes the first few queued messages dont arrive in the first version either. i might have 5 queued and only get 3 or 4.
The text was updated successfully, but these errors were encountered: