Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages losts #124

Closed
JoseAntonioTorre opened this issue Aug 15, 2022 · 3 comments
Closed

Messages losts #124

JoseAntonioTorre opened this issue Aug 15, 2022 · 3 comments

Comments

@JoseAntonioTorre
Copy link

Hi, first of all thanks in advance for your fantastic library.
These days I have been working with it and I have checked that in some situations messages are "Lost". I put lost in quotes because, after days debugging it I see that the paho mqtt logger shows the incoming message but my handling code it is not executed.

I have followed the README to design the "handling part" using filtered messages as can be shown here:

     async with mqtt_client.client.filtered_messages(self.sub_topic) as messages:
            logging.debug("I am inside the yield generator")
            async for message in messages:
                logger.info(f"{self.device.id} Message: {message.payload.decode('utf-8')}")
                try:
                    if self._check_filter(message):
                        logger.info(f"{self.device.id}: Filter passed")
                        correct = True
                        break
                    else:
                        correct = False
                        logger.info(f"{self.device.id}: Ignoring")
                except Exception as Ex:
                    logger.warning("Exception ")
                    logger.warning(Ex)

When I lost the message I haven seen that "I am inside the yield generator" it is printed but it never prints the log inside the async for.
After several hours debugging and putting printing code inside paho and asynmqtt I have seen that everytime that I lost a message this pattern appears:

DEBUG:mqtt:Sending PUBLISH (d0, q0, r0, m65), 'b'sensor/state'', ... (86 bytes)
DEBUG:mqtt:Received PUBLISH (d0, q0, r0, m0), 'sensor/ack', ...  (41 bytes)
Before line 3555 paho mqtt client.py, before for iterating over on_message_callbacks
It seems that on_message_callbacks for sensor/ack is empty len(on_message_callbacks) == 0
After line 3580 paho mqtt client.py, after iterating over on_message_callbacks
Adding a callback for sensor/ack line 2318 paho mqtt client.py 
Added a callback for sensor/ack line 2320 paho mqtt client.py

As can be seen I have added a lot of prints inside paho and asynmqtt and what I have observed is that the on_handle of paho is executed before asyncmqtt adds the callback inside on_filtered_messages so, when it adds it the messages has been already processed and therefor my async for message in message will never execute because the callback that adds the message to the queue is never called.

Hope this is useful for debugging it.

@germanh1982
Copy link

I think you may be hitting the problem described in issue #95. Any delay between the opening of the Client context and the call to filtered_messages() or unfiltered_messages() incur in messages not handled by the consumer.

@frederikaalund
Copy link
Collaborator

I think that @germanh1982 is right. 👍

@JoseAntonioTorre, can you try to apply one of the workarounds/fixes from #95 and see if it resolves this issue?

@JoseAntonioTorre
Copy link
Author

Hi, thanks for your suggestions, I will try it and see if it works.
While waiting for the answers my specific solution was to call the coroutine that contains this code:

  async with mqtt_client.client.filtered_messages(self.sub_topic) as messages:
            logging.debug("I am inside the yield generator")
            async for message in messages:
                logger.info(f"{self.device.id} Message: {message.payload.decode('utf-8')}")
                try:
                    if self._check_filter(message):
                        logger.info(f"{self.device.id}: Filter passed")
                        correct = True
                        break
                    else:
                        correct = False
                        logger.info(f"{self.device.id}: Ignoring")
                except Exception as Ex:
                    logger.warning("Exception ")
                    logger.warning(Ex)

before sending the message that generates the receiving message that I want to read.

fut = asyncio.create_task(receiving_coro)
await send_message()
await fut

Instead of:

await send_message()
await receiving_coro()

Obviously this work because in my case I don't have any message in that topic until I sent a message in another one with send_message() however this solution will not work if a message appears in that topic before calling the create_task().

I will test the options you give me and let you know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants